diff --git a/client/consensus/beefy/src/communication/gossip.rs b/client/consensus/beefy/src/communication/gossip.rs index 2b5e772c0578f..376172fc23370 100644 --- a/client/consensus/beefy/src/communication/gossip.rs +++ b/client/consensus/beefy/src/communication/gossip.rs @@ -28,10 +28,17 @@ use log::{debug, trace}; use parking_lot::{Mutex, RwLock}; use wasm_timer::Instant; -use crate::{communication::peers::KnownPeers, keystore::BeefyKeystore, LOG_TARGET}; +use crate::{ + communication::peers::KnownPeers, + justification::{ + proof_block_num_and_set_id, verify_with_validator_set, BeefyVersionedFinalityProof, + }, + keystore::BeefyKeystore, + LOG_TARGET, +}; use sp_consensus_beefy::{ - crypto::{Public, Signature}, - ValidatorSetId, VoteMessage, + crypto::{AuthorityId, Signature}, + ValidatorSet, ValidatorSetId, VoteMessage, }; // Timeout for rebroadcasting messages. @@ -40,59 +47,128 @@ const REBROADCAST_AFTER: Duration = Duration::from_secs(60); #[cfg(test)] const REBROADCAST_AFTER: Duration = Duration::from_secs(5); -/// Gossip engine messages topic -pub(crate) fn topic() -> B::Hash +/// BEEFY gossip message type that gets encoded and sent on the network. +#[derive(Debug, Encode, Decode)] +pub(crate) enum GossipMessage { + /// BEEFY message with commitment and single signature. + Vote(VoteMessage, AuthorityId, Signature>), + /// BEEFY justification with commitment and signatures. + FinalityProof(BeefyVersionedFinalityProof), +} + +impl GossipMessage { + /// Return inner vote if this message is a Vote. + pub fn unwrap_vote(self) -> Option, AuthorityId, Signature>> { + match self { + GossipMessage::Vote(vote) => Some(vote), + GossipMessage::FinalityProof(_) => None, + } + } + + /// Return inner finality proof if this message is a FinalityProof. + pub fn unwrap_finality_proof(self) -> Option> { + match self { + GossipMessage::Vote(_) => None, + GossipMessage::FinalityProof(proof) => Some(proof), + } + } +} + +/// Gossip engine votes messages topic +pub(crate) fn votes_topic() -> B::Hash where B: Block, { - <::Hashing as Hash>::hash(b"beefy") + <::Hashing as Hash>::hash(b"beefy-votes") } -#[derive(Debug)] -pub(crate) struct GossipVoteFilter { - pub start: NumberFor, - pub end: NumberFor, - pub validator_set_id: ValidatorSetId, +/// Gossip engine justifications messages topic +pub(crate) fn proofs_topic() -> B::Hash +where + B: Block, +{ + <::Hashing as Hash>::hash(b"beefy-justifications") } /// A type that represents hash of the message. pub type MessageHash = [u8; 8]; -struct VotesFilter { - filter: Option>, - live: BTreeMap, fnv::FnvHashSet>, +#[derive(Clone, Debug)] +pub(crate) struct GossipFilterCfg<'a, B: Block> { + pub start: NumberFor, + pub end: NumberFor, + pub validator_set: &'a ValidatorSet, +} + +#[derive(Clone, Debug)] +struct FilterInner { + pub start: NumberFor, + pub end: NumberFor, + pub validator_set: ValidatorSet, +} + +struct Filter { + inner: Option>, + live_votes: BTreeMap, fnv::FnvHashSet>, } -impl VotesFilter { +impl Filter { pub fn new() -> Self { - Self { filter: None, live: BTreeMap::new() } + Self { inner: None, live_votes: BTreeMap::new() } } /// Update filter to new `start` and `set_id`. - fn update(&mut self, filter: GossipVoteFilter) { - self.live.retain(|&round, _| round >= filter.start && round <= filter.end); - self.filter = Some(filter); + fn update(&mut self, cfg: GossipFilterCfg) { + self.live_votes.retain(|&round, _| round >= cfg.start && round <= cfg.end); + // only clone+overwrite big validator_set if set_id changed + match self.inner.as_mut() { + Some(f) if f.validator_set.id() == cfg.validator_set.id() => { + f.start = cfg.start; + f.end = cfg.end; + }, + _ => + self.inner = Some(FilterInner { + start: cfg.start, + end: cfg.end, + validator_set: cfg.validator_set.clone(), + }), + } + } + + /// Return true if `max(session_start, best_beefy) <= round <= best_grandpa`, + /// and vote `set_id` matches session set id. + /// + /// Latest concluded round is still considered alive to allow proper gossiping for it. + fn is_vote_accepted(&self, round: NumberFor, set_id: ValidatorSetId) -> bool { + self.inner + .as_ref() + .map(|f| set_id == f.validator_set.id() && round >= f.start && round <= f.end) + .unwrap_or(false) } /// Return true if `round` is >= than `max(session_start, best_beefy)`, - /// and vote set id matches session set id. + /// and proof `set_id` matches session set id. /// /// Latest concluded round is still considered alive to allow proper gossiping for it. - fn is_live(&self, round: NumberFor, set_id: ValidatorSetId) -> bool { - self.filter + fn is_finality_proof_accepted(&self, round: NumberFor, set_id: ValidatorSetId) -> bool { + self.inner .as_ref() - .map(|f| set_id == f.validator_set_id && round >= f.start && round <= f.end) + .map(|f| set_id == f.validator_set.id() && round >= f.start) .unwrap_or(false) } /// Add new _known_ `hash` to the round's known votes. - fn add_known(&mut self, round: NumberFor, hash: MessageHash) { - self.live.entry(round).or_default().insert(hash); + fn add_known_vote(&mut self, round: NumberFor, hash: MessageHash) { + self.live_votes.entry(round).or_default().insert(hash); } /// Check if `hash` is already part of round's known votes. - fn is_known(&self, round: NumberFor, hash: &MessageHash) -> bool { - self.live.get(&round).map(|known| known.contains(hash)).unwrap_or(false) + fn is_known_vote(&self, round: NumberFor, hash: &MessageHash) -> bool { + self.live_votes.get(&round).map(|known| known.contains(hash)).unwrap_or(false) + } + + fn validator_set(&self) -> Option<&ValidatorSet> { + self.inner.as_ref().map(|f| &f.validator_set) } } @@ -108,8 +184,9 @@ pub(crate) struct GossipValidator where B: Block, { - topic: B::Hash, - votes_filter: RwLock>, + votes_topic: B::Hash, + justifs_topic: B::Hash, + gossip_filter: RwLock>, next_rebroadcast: Mutex, known_peers: Arc>>, } @@ -120,8 +197,9 @@ where { pub fn new(known_peers: Arc>>) -> GossipValidator { GossipValidator { - topic: topic::(), - votes_filter: RwLock::new(VotesFilter::new()), + votes_topic: votes_topic::(), + justifs_topic: proofs_topic::(), + gossip_filter: RwLock::new(Filter::new()), next_rebroadcast: Mutex::new(Instant::now() + REBROADCAST_AFTER), known_peers, } @@ -130,9 +208,79 @@ where /// Update gossip validator filter. /// /// Only votes for `set_id` and rounds `start <= round <= end` will be accepted. - pub(crate) fn update_filter(&self, filter: GossipVoteFilter) { + pub(crate) fn update_filter(&self, filter: GossipFilterCfg) { debug!(target: LOG_TARGET, "🥩 New gossip filter {:?}", filter); - self.votes_filter.write().update(filter); + self.gossip_filter.write().update(filter); + } + + fn validate_vote( + &self, + vote: VoteMessage, AuthorityId, Signature>, + sender: &PeerId, + data: &[u8], + ) -> ValidationResult { + let msg_hash = twox_64(data); + let round = vote.commitment.block_number; + let set_id = vote.commitment.validator_set_id; + self.known_peers.lock().note_vote_for(*sender, round); + + // Verify general usefulness of the message. + // We are going to discard old votes right away (without verification) + // Also we keep track of already received votes to avoid verifying duplicates. + { + let filter = self.gossip_filter.read(); + + if !filter.is_vote_accepted(round, set_id) { + return ValidationResult::Discard + } + + if filter.is_known_vote(round, &msg_hash) { + return ValidationResult::ProcessAndKeep(self.votes_topic) + } + } + + if BeefyKeystore::verify(&vote.id, &vote.signature, &vote.commitment.encode()) { + self.gossip_filter.write().add_known_vote(round, msg_hash); + ValidationResult::ProcessAndKeep(self.votes_topic) + } else { + // TODO: report peer + debug!( + target: LOG_TARGET, + "🥩 Bad signature on message: {:?}, from: {:?}", vote, sender + ); + ValidationResult::Discard + } + } + + fn validate_finality_proof( + &self, + proof: BeefyVersionedFinalityProof, + sender: &PeerId, + ) -> ValidationResult { + let (round, set_id) = proof_block_num_and_set_id::(&proof); + self.known_peers.lock().note_vote_for(*sender, round); + + let guard = self.gossip_filter.read(); + // Verify general usefulness of the justifications. + if !guard.is_finality_proof_accepted(round, set_id) { + return ValidationResult::Discard + } + // Verify justification signatures. + guard + .validator_set() + .map(|validator_set| { + if let Ok(()) = verify_with_validator_set::(round, validator_set, &proof) { + ValidationResult::ProcessAndKeep(self.justifs_topic) + } else { + // TODO: report peer + debug!( + target: LOG_TARGET, + "🥩 Bad signatures on message: {:?}, from: {:?}", proof, sender + ); + ValidationResult::Discard + } + }) + .unwrap_or(ValidationResult::Discard) } } @@ -150,57 +298,38 @@ where sender: &PeerId, mut data: &[u8], ) -> ValidationResult { - if let Ok(msg) = VoteMessage::, Public, Signature>::decode(&mut data) { - let msg_hash = twox_64(data); - let round = msg.commitment.block_number; - let set_id = msg.commitment.validator_set_id; - self.known_peers.lock().note_vote_for(*sender, round); - - // Verify general usefulness of the message. - // We are going to discard old votes right away (without verification) - // Also we keep track of already received votes to avoid verifying duplicates. - { - let filter = self.votes_filter.read(); - - if !filter.is_live(round, set_id) { - return ValidationResult::Discard - } - - if filter.is_known(round, &msg_hash) { - return ValidationResult::ProcessAndKeep(self.topic) - } - } - - if BeefyKeystore::verify(&msg.id, &msg.signature, &msg.commitment.encode()) { - self.votes_filter.write().add_known(round, msg_hash); - return ValidationResult::ProcessAndKeep(self.topic) - } else { - // TODO: report peer - debug!( - target: LOG_TARGET, - "🥩 Bad signature on message: {:?}, from: {:?}", msg, sender - ); - } + match GossipMessage::::decode(&mut data) { + Ok(GossipMessage::Vote(msg)) => self.validate_vote(msg, sender, data), + Ok(GossipMessage::FinalityProof(proof)) => self.validate_finality_proof(proof, sender), + Err(e) => { + debug!(target: LOG_TARGET, "Error decoding message: {}", e); + ValidationResult::Discard + }, } - - ValidationResult::Discard } fn message_expired<'a>(&'a self) -> Box bool + 'a> { - let filter = self.votes_filter.read(); - Box::new(move |_topic, mut data| { - let msg = match VoteMessage::, Public, Signature>::decode(&mut data) { - Ok(vote) => vote, - Err(_) => return true, - }; - - let round = msg.commitment.block_number; - let set_id = msg.commitment.validator_set_id; - let expired = !filter.is_live(round, set_id); - - trace!(target: LOG_TARGET, "🥩 Message for round #{} expired: {}", round, expired); - - expired + let filter = self.gossip_filter.read(); + Box::new(move |_topic, mut data| match GossipMessage::::decode(&mut data) { + Ok(GossipMessage::Vote(msg)) => { + let round = msg.commitment.block_number; + let set_id = msg.commitment.validator_set_id; + let expired = !filter.is_vote_accepted(round, set_id); + trace!(target: LOG_TARGET, "🥩 Vote for round #{} expired: {}", round, expired); + expired + }, + Ok(GossipMessage::FinalityProof(proof)) => { + let (round, set_id) = proof_block_num_and_set_id::(&proof); + let expired = !filter.is_finality_proof_accepted(round, set_id); + trace!( + target: LOG_TARGET, + "🥩 Finality proof for round #{} expired: {}", + round, + expired + ); + expired + }, + Err(_) => true, }) } @@ -219,68 +348,80 @@ where } }; - let filter = self.votes_filter.read(); + let filter = self.gossip_filter.read(); Box::new(move |_who, intent, _topic, mut data| { if let MessageIntent::PeriodicRebroadcast = intent { return do_rebroadcast } - let msg = match VoteMessage::, Public, Signature>::decode(&mut data) { - Ok(vote) => vote, - Err(_) => return false, - }; - - let round = msg.commitment.block_number; - let set_id = msg.commitment.validator_set_id; - let allowed = filter.is_live(round, set_id); - - trace!(target: LOG_TARGET, "🥩 Message for round #{} allowed: {}", round, allowed); - - allowed + match GossipMessage::::decode(&mut data) { + Ok(GossipMessage::Vote(msg)) => { + let round = msg.commitment.block_number; + let set_id = msg.commitment.validator_set_id; + let allowed = filter.is_vote_accepted(round, set_id); + trace!(target: LOG_TARGET, "🥩 Vote for round #{} allowed: {}", round, allowed); + allowed + }, + Ok(GossipMessage::FinalityProof(proof)) => { + let (round, set_id) = proof_block_num_and_set_id::(&proof); + let allowed = filter.is_finality_proof_accepted(round, set_id); + trace!( + target: LOG_TARGET, + "🥩 Finality proof for round #{} allowed: {}", + round, + allowed + ); + allowed + }, + Err(_) => false, + } }) } } #[cfg(test)] -mod tests { +pub(crate) mod tests { use super::*; use crate::keystore::BeefyKeystore; use sc_network_test::Block; use sp_consensus_beefy::{ - crypto::Signature, known_payloads, Commitment, Keyring, MmrRootHash, Payload, VoteMessage, - KEY_TYPE, + crypto::Signature, known_payloads, Commitment, Keyring, MmrRootHash, Payload, + SignedCommitment, VoteMessage, KEY_TYPE, }; use sp_keystore::{testing::MemoryKeystore, Keystore}; #[test] fn known_votes_insert_remove() { - let mut kv = VotesFilter::::new(); + let mut filter = Filter::::new(); let msg_hash = twox_64(b"data"); - - kv.add_known(1, msg_hash); - kv.add_known(1, msg_hash); - kv.add_known(2, msg_hash); - assert_eq!(kv.live.len(), 2); - - kv.add_known(3, msg_hash); - assert!(kv.is_known(3, &msg_hash)); - assert!(!kv.is_known(3, &twox_64(b"other"))); - assert!(!kv.is_known(4, &msg_hash)); - assert_eq!(kv.live.len(), 3); - - assert!(kv.filter.is_none()); - assert!(!kv.is_live(1, 1)); - - kv.update(GossipVoteFilter { start: 3, end: 10, validator_set_id: 1 }); - assert_eq!(kv.live.len(), 1); - assert!(kv.live.contains_key(&3)); - assert!(!kv.is_live(2, 1)); - assert!(kv.is_live(3, 1)); - assert!(kv.is_live(4, 1)); - assert!(!kv.is_live(4, 2)); - - kv.update(GossipVoteFilter { start: 5, end: 10, validator_set_id: 2 }); - assert!(kv.live.is_empty()); + let keys = vec![Keyring::Alice.public()]; + let validator_set = ValidatorSet::::new(keys.clone(), 1).unwrap(); + + filter.add_known_vote(1, msg_hash); + filter.add_known_vote(1, msg_hash); + filter.add_known_vote(2, msg_hash); + assert_eq!(filter.live_votes.len(), 2); + + filter.add_known_vote(3, msg_hash); + assert!(filter.is_known_vote(3, &msg_hash)); + assert!(!filter.is_known_vote(3, &twox_64(b"other"))); + assert!(!filter.is_known_vote(4, &msg_hash)); + assert_eq!(filter.live_votes.len(), 3); + + assert!(filter.inner.is_none()); + assert!(!filter.is_vote_accepted(1, 1)); + + filter.update(GossipFilterCfg { start: 3, end: 10, validator_set: &validator_set }); + assert_eq!(filter.live_votes.len(), 1); + assert!(filter.live_votes.contains_key(&3)); + assert!(!filter.is_vote_accepted(2, 1)); + assert!(filter.is_vote_accepted(3, 1)); + assert!(filter.is_vote_accepted(4, 1)); + assert!(!filter.is_vote_accepted(4, 2)); + + let validator_set = ValidatorSet::::new(keys, 2).unwrap(); + filter.update(GossipFilterCfg { start: 5, end: 10, validator_set: &validator_set }); + assert!(filter.live_votes.is_empty()); } struct TestContext; @@ -302,14 +443,14 @@ mod tests { } } - fn sign_commitment(who: &Keyring, commitment: &Commitment) -> Signature { + pub fn sign_commitment(who: &Keyring, commitment: &Commitment) -> Signature { let store = MemoryKeystore::new(); store.ecdsa_generate_new(KEY_TYPE, Some(&who.to_seed())).unwrap(); let beefy_keystore: BeefyKeystore = Some(store.into()).into(); beefy_keystore.sign(&who.public(), &commitment.encode()).unwrap() } - fn dummy_vote(block_number: u64) -> VoteMessage { + fn dummy_vote(block_number: u64) -> VoteMessage { let payload = Payload::from_single_entry( known_payloads::MMR_ROOT_ID, MmrRootHash::default().encode(), @@ -320,51 +461,111 @@ mod tests { VoteMessage { commitment, id: Keyring::Alice.public(), signature } } + pub fn dummy_proof( + block_number: u64, + validator_set: &ValidatorSet, + ) -> BeefyVersionedFinalityProof { + let payload = Payload::from_single_entry( + known_payloads::MMR_ROOT_ID, + MmrRootHash::default().encode(), + ); + let commitment = Commitment { payload, block_number, validator_set_id: validator_set.id() }; + let signatures = validator_set + .validators() + .iter() + .map(|validator: &AuthorityId| { + Some(sign_commitment(&Keyring::from_public(validator).unwrap(), &commitment)) + }) + .collect(); + + BeefyVersionedFinalityProof::::V1(SignedCommitment { commitment, signatures }) + } + #[test] - fn should_avoid_verifying_signatures_twice() { + fn should_validate_messages() { + let keys = vec![Keyring::Alice.public()]; + let validator_set = ValidatorSet::::new(keys.clone(), 0).unwrap(); let gv = GossipValidator::::new(Arc::new(Mutex::new(KnownPeers::new()))); - gv.update_filter(GossipVoteFilter { start: 0, end: 10, validator_set_id: 0 }); + gv.update_filter(GossipFilterCfg { start: 0, end: 10, validator_set: &validator_set }); let sender = sc_network::PeerId::random(); let mut context = TestContext; let vote = dummy_vote(3); + let gossip_vote = GossipMessage::::Vote(vote.clone()); // first time the cache should be populated - let res = gv.validate(&mut context, &sender, &vote.encode()); + let res = gv.validate(&mut context, &sender, &gossip_vote.encode()); assert!(matches!(res, ValidationResult::ProcessAndKeep(_))); assert_eq!( - gv.votes_filter.read().live.get(&vote.commitment.block_number).map(|x| x.len()), + gv.gossip_filter + .read() + .live_votes + .get(&vote.commitment.block_number) + .map(|x| x.len()), Some(1) ); // second time we should hit the cache - let res = gv.validate(&mut context, &sender, &vote.encode()); - + let res = gv.validate(&mut context, &sender, &gossip_vote.encode()); assert!(matches!(res, ValidationResult::ProcessAndKeep(_))); // next we should quickly reject if the round is not live - gv.update_filter(GossipVoteFilter { start: 7, end: 10, validator_set_id: 0 }); + gv.update_filter(GossipFilterCfg { start: 7, end: 10, validator_set: &validator_set }); let number = vote.commitment.block_number; let set_id = vote.commitment.validator_set_id; - assert!(!gv.votes_filter.read().is_live(number, set_id)); + assert!(!gv.gossip_filter.read().is_vote_accepted(number, set_id)); let res = gv.validate(&mut context, &sender, &vote.encode()); + assert!(matches!(res, ValidationResult::Discard)); + + // reject old proof + let proof = dummy_proof(5, &validator_set); + let encoded_proof = GossipMessage::::FinalityProof(proof).encode(); + let res = gv.validate(&mut context, &sender, &encoded_proof); + assert!(matches!(res, ValidationResult::Discard)); + + // accept next proof with good set_id + let proof = dummy_proof(7, &validator_set); + let encoded_proof = GossipMessage::::FinalityProof(proof).encode(); + let res = gv.validate(&mut context, &sender, &encoded_proof); + assert!(matches!(res, ValidationResult::ProcessAndKeep(_))); + + // accept future proof with good set_id + let proof = dummy_proof(20, &validator_set); + let encoded_proof = GossipMessage::::FinalityProof(proof).encode(); + let res = gv.validate(&mut context, &sender, &encoded_proof); + assert!(matches!(res, ValidationResult::ProcessAndKeep(_))); + // reject proof, wrong set_id + let bad_validator_set = ValidatorSet::::new(keys, 1).unwrap(); + let proof = dummy_proof(20, &bad_validator_set); + let encoded_proof = GossipMessage::::FinalityProof(proof).encode(); + let res = gv.validate(&mut context, &sender, &encoded_proof); + assert!(matches!(res, ValidationResult::Discard)); + + // reject proof, bad signatures (Bob instead of Alice) + let bad_validator_set = + ValidatorSet::::new(vec![Keyring::Bob.public()], 0).unwrap(); + let proof = dummy_proof(20, &bad_validator_set); + let encoded_proof = GossipMessage::::FinalityProof(proof).encode(); + let res = gv.validate(&mut context, &sender, &encoded_proof); assert!(matches!(res, ValidationResult::Discard)); } #[test] fn messages_allowed_and_expired() { + let keys = vec![Keyring::Alice.public()]; + let validator_set = ValidatorSet::::new(keys.clone(), 0).unwrap(); let gv = GossipValidator::::new(Arc::new(Mutex::new(KnownPeers::new()))); - gv.update_filter(GossipVoteFilter { start: 0, end: 10, validator_set_id: 0 }); + gv.update_filter(GossipFilterCfg { start: 0, end: 10, validator_set: &validator_set }); let sender = sc_network::PeerId::random(); let topic = Default::default(); let intent = MessageIntent::Broadcast; // conclude 2 - gv.update_filter(GossipVoteFilter { start: 2, end: 10, validator_set_id: 0 }); + gv.update_filter(GossipFilterCfg { start: 2, end: 10, validator_set: &validator_set }); let mut allowed = gv.message_allowed(); let mut expired = gv.message_expired(); @@ -374,33 +575,68 @@ mod tests { // inactive round 1 -> expired let vote = dummy_vote(1); - let mut encoded_vote = vote.encode(); + let mut encoded_vote = GossipMessage::::Vote(vote).encode(); assert!(!allowed(&sender, intent, &topic, &mut encoded_vote)); assert!(expired(topic, &mut encoded_vote)); + let proof = dummy_proof(1, &validator_set); + let mut encoded_proof = GossipMessage::::FinalityProof(proof).encode(); + assert!(!allowed(&sender, intent, &topic, &mut encoded_proof)); + assert!(expired(topic, &mut encoded_proof)); // active round 2 -> !expired - concluded but still gossiped let vote = dummy_vote(2); - let mut encoded_vote = vote.encode(); + let mut encoded_vote = GossipMessage::::Vote(vote).encode(); assert!(allowed(&sender, intent, &topic, &mut encoded_vote)); assert!(!expired(topic, &mut encoded_vote)); + let proof = dummy_proof(2, &validator_set); + let mut encoded_proof = GossipMessage::::FinalityProof(proof).encode(); + assert!(allowed(&sender, intent, &topic, &mut encoded_proof)); + assert!(!expired(topic, &mut encoded_proof)); + // using wrong set_id -> !allowed, expired + let bad_validator_set = ValidatorSet::::new(keys.clone(), 1).unwrap(); + let proof = dummy_proof(2, &bad_validator_set); + let mut encoded_proof = GossipMessage::::FinalityProof(proof).encode(); + assert!(!allowed(&sender, intent, &topic, &mut encoded_proof)); + assert!(expired(topic, &mut encoded_proof)); // in progress round 3 -> !expired let vote = dummy_vote(3); - let mut encoded_vote = vote.encode(); + let mut encoded_vote = GossipMessage::::Vote(vote).encode(); assert!(allowed(&sender, intent, &topic, &mut encoded_vote)); assert!(!expired(topic, &mut encoded_vote)); + let proof = dummy_proof(3, &validator_set); + let mut encoded_proof = GossipMessage::::FinalityProof(proof).encode(); + assert!(allowed(&sender, intent, &topic, &mut encoded_proof)); + assert!(!expired(topic, &mut encoded_proof)); // unseen round 4 -> !expired - let vote = dummy_vote(3); - let mut encoded_vote = vote.encode(); + let vote = dummy_vote(4); + let mut encoded_vote = GossipMessage::::Vote(vote).encode(); assert!(allowed(&sender, intent, &topic, &mut encoded_vote)); assert!(!expired(topic, &mut encoded_vote)); + let proof = dummy_proof(4, &validator_set); + let mut encoded_proof = GossipMessage::::FinalityProof(proof).encode(); + assert!(allowed(&sender, intent, &topic, &mut encoded_proof)); + assert!(!expired(topic, &mut encoded_proof)); + + // future round 11 -> expired + let vote = dummy_vote(11); + let mut encoded_vote = GossipMessage::::Vote(vote).encode(); + assert!(!allowed(&sender, intent, &topic, &mut encoded_vote)); + assert!(expired(topic, &mut encoded_vote)); + // future proofs allowed while same set_id -> allowed + let proof = dummy_proof(11, &validator_set); + let mut encoded_proof = GossipMessage::::FinalityProof(proof).encode(); + assert!(allowed(&sender, intent, &topic, &mut encoded_proof)); + assert!(!expired(topic, &mut encoded_proof)); } #[test] fn messages_rebroadcast() { + let keys = vec![Keyring::Alice.public()]; + let validator_set = ValidatorSet::::new(keys.clone(), 0).unwrap(); let gv = GossipValidator::::new(Arc::new(Mutex::new(KnownPeers::new()))); - gv.update_filter(GossipVoteFilter { start: 0, end: 10, validator_set_id: 0 }); + gv.update_filter(GossipFilterCfg { start: 0, end: 10, validator_set: &validator_set }); let sender = sc_network::PeerId::random(); let topic = Default::default(); diff --git a/client/consensus/beefy/src/communication/mod.rs b/client/consensus/beefy/src/communication/mod.rs index 295d549bb1ba8..13735a9d3211b 100644 --- a/client/consensus/beefy/src/communication/mod.rs +++ b/client/consensus/beefy/src/communication/mod.rs @@ -29,7 +29,7 @@ pub(crate) mod beefy_protocol_name { use sc_network::ProtocolName; /// BEEFY votes gossip protocol name suffix. - const GOSSIP_NAME: &str = "/beefy/1"; + const GOSSIP_NAME: &str = "/beefy/2"; /// BEEFY justifications protocol name suffix. const JUSTIFICATIONS_NAME: &str = "/beefy/justifications/1"; @@ -86,7 +86,7 @@ mod tests { let genesis_hash = H256::random(); let genesis_hex = array_bytes::bytes2hex("", genesis_hash.as_ref()); - let expected_gossip_name = format!("/{}/beefy/1", genesis_hex); + let expected_gossip_name = format!("/{}/beefy/2", genesis_hex); let gossip_proto_name = gossip_protocol_name(&genesis_hash, None); assert_eq!(gossip_proto_name.to_string(), expected_gossip_name); @@ -101,7 +101,7 @@ mod tests { ]; let genesis_hex = "32043c7b3a6ad8f6c2bc8bc121d4caab09377b5e082b0cfbbb39ad13bc4acd93"; - let expected_gossip_name = format!("/{}/beefy/1", genesis_hex); + let expected_gossip_name = format!("/{}/beefy/2", genesis_hex); let gossip_proto_name = gossip_protocol_name(&genesis_hash, None); assert_eq!(gossip_proto_name.to_string(), expected_gossip_name); diff --git a/client/consensus/beefy/src/justification.rs b/client/consensus/beefy/src/justification.rs index 1bd250b2a25f3..5175fd17d4ea3 100644 --- a/client/consensus/beefy/src/justification.rs +++ b/client/consensus/beefy/src/justification.rs @@ -21,13 +21,21 @@ use codec::{Decode, Encode}; use sp_consensus::Error as ConsensusError; use sp_consensus_beefy::{ crypto::{AuthorityId, Signature}, - ValidatorSet, VersionedFinalityProof, + ValidatorSet, ValidatorSetId, VersionedFinalityProof, }; use sp_runtime::traits::{Block as BlockT, NumberFor}; /// A finality proof with matching BEEFY authorities' signatures. -pub type BeefyVersionedFinalityProof = - sp_consensus_beefy::VersionedFinalityProof, Signature>; +pub type BeefyVersionedFinalityProof = VersionedFinalityProof, Signature>; + +pub(crate) fn proof_block_num_and_set_id( + proof: &BeefyVersionedFinalityProof, +) -> (NumberFor, ValidatorSetId) { + match proof { + VersionedFinalityProof::V1(sc) => + (sc.commitment.block_number, sc.commitment.validator_set_id), + } +} /// Decode and verify a Beefy FinalityProof. pub(crate) fn decode_and_verify_finality_proof( @@ -41,7 +49,7 @@ pub(crate) fn decode_and_verify_finality_proof( } /// Verify the Beefy finality proof against the validator set at the block it was generated. -fn verify_with_validator_set( +pub(crate) fn verify_with_validator_set( target_number: NumberFor, validator_set: &ValidatorSet, proof: &BeefyVersionedFinalityProof, diff --git a/client/consensus/beefy/src/lib.rs b/client/consensus/beefy/src/lib.rs index b84fa45e7e2f3..3c66cc6eb716d 100644 --- a/client/consensus/beefy/src/lib.rs +++ b/client/consensus/beefy/src/lib.rs @@ -288,7 +288,7 @@ pub async fn start_beefy_gadget( }; // Update the gossip validator with the right starting round and set id. if let Err(e) = persisted_state - .current_gossip_filter() + .gossip_filter_config() .map(|f| gossip_validator.update_filter(f)) { error!(target: LOG_TARGET, "Error: {:?}. Terminating.", e); diff --git a/client/consensus/beefy/src/round.rs b/client/consensus/beefy/src/round.rs index 64d03beeee854..d8948ff98c552 100644 --- a/client/consensus/beefy/src/round.rs +++ b/client/consensus/beefy/src/round.rs @@ -21,7 +21,7 @@ use crate::LOG_TARGET; use codec::{Decode, Encode}; use log::debug; use sp_consensus_beefy::{ - crypto::{AuthorityId, Public, Signature}, + crypto::{AuthorityId, Signature}, Commitment, EquivocationProof, SignedCommitment, ValidatorSet, ValidatorSetId, VoteMessage, }; use sp_runtime::traits::{Block, NumberFor}; @@ -33,11 +33,11 @@ use std::collections::BTreeMap; /// Does not do any validation on votes or signatures, layers above need to handle that (gossip). #[derive(Debug, Decode, Default, Encode, PartialEq)] pub(crate) struct RoundTracker { - votes: BTreeMap, + votes: BTreeMap, } impl RoundTracker { - fn add_vote(&mut self, vote: (Public, Signature)) -> bool { + fn add_vote(&mut self, vote: (AuthorityId, Signature)) -> bool { if self.votes.contains_key(&vote.0) { return false } @@ -61,7 +61,7 @@ pub fn threshold(authorities: usize) -> usize { pub enum VoteImportResult { Ok, RoundConcluded(SignedCommitment, Signature>), - Equivocation(EquivocationProof, Public, Signature>), + Equivocation(EquivocationProof, AuthorityId, Signature>), Invalid, Stale, } @@ -73,9 +73,10 @@ pub enum VoteImportResult { #[derive(Debug, Decode, Encode, PartialEq)] pub(crate) struct Rounds { rounds: BTreeMap>, RoundTracker>, - previous_votes: BTreeMap<(Public, NumberFor), VoteMessage, Public, Signature>>, + previous_votes: + BTreeMap<(AuthorityId, NumberFor), VoteMessage, AuthorityId, Signature>>, session_start: NumberFor, - validator_set: ValidatorSet, + validator_set: ValidatorSet, mandatory_done: bool, best_done: Option>, } @@ -84,7 +85,10 @@ impl Rounds where B: Block, { - pub(crate) fn new(session_start: NumberFor, validator_set: ValidatorSet) -> Self { + pub(crate) fn new( + session_start: NumberFor, + validator_set: ValidatorSet, + ) -> Self { Rounds { rounds: BTreeMap::new(), previous_votes: BTreeMap::new(), @@ -95,7 +99,7 @@ where } } - pub(crate) fn validator_set(&self) -> &ValidatorSet { + pub(crate) fn validator_set(&self) -> &ValidatorSet { &self.validator_set } @@ -103,7 +107,7 @@ where self.validator_set.id() } - pub(crate) fn validators(&self) -> &[Public] { + pub(crate) fn validators(&self) -> &[AuthorityId] { self.validator_set.validators() } @@ -199,11 +203,11 @@ mod tests { use sc_network_test::Block; use sp_consensus_beefy::{ - crypto::Public, known_payloads::MMR_ROOT_ID, Commitment, EquivocationProof, Keyring, - Payload, SignedCommitment, ValidatorSet, VoteMessage, + known_payloads::MMR_ROOT_ID, Commitment, EquivocationProof, Keyring, Payload, + SignedCommitment, ValidatorSet, VoteMessage, }; - use super::{threshold, Block as BlockT, RoundTracker, Rounds}; + use super::{threshold, AuthorityId, Block as BlockT, RoundTracker, Rounds}; use crate::round::VoteImportResult; impl Rounds @@ -251,7 +255,7 @@ mod tests { fn new_rounds() { sp_tracing::try_init_simple(); - let validators = ValidatorSet::::new( + let validators = ValidatorSet::::new( vec![Keyring::Alice.public(), Keyring::Bob.public(), Keyring::Charlie.public()], 42, ) @@ -272,7 +276,7 @@ mod tests { fn add_and_conclude_votes() { sp_tracing::try_init_simple(); - let validators = ValidatorSet::::new( + let validators = ValidatorSet::::new( vec![ Keyring::Alice.public(), Keyring::Bob.public(), @@ -338,7 +342,7 @@ mod tests { fn old_rounds_not_accepted() { sp_tracing::try_init_simple(); - let validators = ValidatorSet::::new( + let validators = ValidatorSet::::new( vec![Keyring::Alice.public(), Keyring::Bob.public(), Keyring::Charlie.public()], 42, ) @@ -384,7 +388,7 @@ mod tests { fn multiple_rounds() { sp_tracing::try_init_simple(); - let validators = ValidatorSet::::new( + let validators = ValidatorSet::::new( vec![Keyring::Alice.public(), Keyring::Bob.public(), Keyring::Charlie.public()], Default::default(), ) @@ -459,7 +463,7 @@ mod tests { fn should_provide_equivocation_proof() { sp_tracing::try_init_simple(); - let validators = ValidatorSet::::new( + let validators = ValidatorSet::::new( vec![Keyring::Alice.public(), Keyring::Bob.public()], Default::default(), ) diff --git a/client/consensus/beefy/src/tests.rs b/client/consensus/beefy/src/tests.rs index 0ad5f10886093..f36c2cd68f97f 100644 --- a/client/consensus/beefy/src/tests.rs +++ b/client/consensus/beefy/src/tests.rs @@ -21,15 +21,18 @@ use crate::{ aux_schema::{load_persistent, tests::verify_persisted_version}, beefy_block_import_and_links, - communication::request_response::{ - on_demand_justifications_protocol_config, BeefyJustifsRequestHandler, + communication::{ + gossip::{ + proofs_topic, tests::sign_commitment, votes_topic, GossipFilterCfg, GossipMessage, + }, + request_response::{on_demand_justifications_protocol_config, BeefyJustifsRequestHandler}, }, gossip_protocol_name, justification::*, load_or_init_voter_state, wait_for_runtime_pallet, BeefyRPCLinks, BeefyVoterLinks, KnownPeers, PersistedState, }; -use futures::{future, stream::FuturesUnordered, Future, StreamExt}; +use futures::{future, stream::FuturesUnordered, Future, FutureExt, StreamExt}; use parking_lot::Mutex; use sc_client_api::{Backend as BackendT, BlockchainEvents, FinalityNotifications, HeaderBackend}; use sc_consensus::{ @@ -48,16 +51,16 @@ use sp_consensus::BlockOrigin; use sp_consensus_beefy::{ crypto::{AuthorityId, Signature}, known_payloads, - mmr::MmrRootProvider, + mmr::{find_mmr_root_digest, MmrRootProvider}, BeefyApi, Commitment, ConsensusLog, EquivocationProof, Keyring as BeefyKeyring, MmrRootHash, OpaqueKeyOwnershipProof, Payload, SignedCommitment, ValidatorSet, ValidatorSetId, - VersionedFinalityProof, BEEFY_ENGINE_ID, KEY_TYPE as BeefyKeyType, + VersionedFinalityProof, VoteMessage, BEEFY_ENGINE_ID, KEY_TYPE as BeefyKeyType, }; use sp_core::H256; use sp_keystore::{testing::MemoryKeystore, Keystore, KeystorePtr}; use sp_mmr_primitives::{Error as MmrError, MmrApi}; use sp_runtime::{ - codec::Encode, + codec::{Decode, Encode}, traits::{Header as HeaderT, NumberFor}, BuildStorage, DigestItem, EncodedJustification, Justifications, Storage, }; @@ -503,16 +506,15 @@ async fn wait_for_beefy_signed_commitments( run_until(wait_for, net).await; } -async fn streams_empty_after_timeout( +async fn streams_empty_after_future( streams: Vec>, - net: &Arc>, - timeout: Option, + future: Option, ) where T: std::fmt::Debug, T: std::cmp::PartialEq, { - if let Some(timeout) = timeout { - run_for(timeout, net).await; + if let Some(future) = future { + future.await; } for mut stream in streams.into_iter() { future::poll_fn(move |cx| { @@ -523,6 +525,18 @@ async fn streams_empty_after_timeout( } } +async fn streams_empty_after_timeout( + streams: Vec>, + net: &Arc>, + timeout: Option, +) where + T: std::fmt::Debug, + T: std::cmp::PartialEq, +{ + let timeout = timeout.map(|timeout| Box::pin(run_for(timeout, net))); + streams_empty_after_future(streams, timeout).await; +} + async fn finalize_block_and_wait_for_beefy( net: &Arc>, // peer index and key @@ -1229,3 +1243,143 @@ async fn beefy_reports_equivocations() { assert_eq!(equivocation_proof.first.id, BeefyKeyring::Bob.public()); assert_eq!(equivocation_proof.first.commitment.block_number, 1); } + +#[tokio::test] +async fn gossipped_finality_proofs() { + sp_tracing::try_init_simple(); + + let validators = [BeefyKeyring::Alice, BeefyKeyring::Bob, BeefyKeyring::Charlie]; + // Only Alice and Bob are running the voter -> finality threshold not reached + let peers = [BeefyKeyring::Alice, BeefyKeyring::Bob]; + let validator_set = ValidatorSet::new(make_beefy_ids(&validators), 0).unwrap(); + let session_len = 30; + let min_block_delta = 1; + + let mut net = BeefyTestNet::new(3); + let api = Arc::new(TestApi::with_validator_set(&validator_set)); + let beefy_peers = peers.iter().enumerate().map(|(id, key)| (id, key, api.clone())).collect(); + + let charlie = &net.peers[2]; + let known_peers = Arc::new(Mutex::new(KnownPeers::::new())); + // Charlie will run just the gossip engine and not the full voter. + let charlie_gossip_validator = + Arc::new(crate::communication::gossip::GossipValidator::new(known_peers)); + charlie_gossip_validator.update_filter(GossipFilterCfg:: { + start: 1, + end: 10, + validator_set: &validator_set, + }); + let mut charlie_gossip_engine = sc_network_gossip::GossipEngine::new( + charlie.network_service().clone(), + charlie.sync_service().clone(), + beefy_gossip_proto_name(), + charlie_gossip_validator.clone(), + None, + ); + + // Alice and Bob run full voter. + tokio::spawn(initialize_beefy(&mut net, beefy_peers, min_block_delta)); + + let net = Arc::new(Mutex::new(net)); + + // Pump net + Charlie gossip to see peers. + let timeout = Box::pin(tokio::time::sleep(Duration::from_millis(200))); + let gossip_engine_pump = &mut charlie_gossip_engine; + let pump_with_timeout = future::select(gossip_engine_pump, timeout); + run_until(pump_with_timeout, &net).await; + + // push 10 blocks + let hashes = net.lock().generate_blocks_and_sync(10, session_len, &validator_set, true).await; + + let peers = peers.into_iter().enumerate(); + + // Alice, Bob and Charlie finalize #1, Alice and Bob vote on it, but not Charlie. + let finalize = hashes[1]; + let (best_blocks, versioned_finality_proof) = get_beefy_streams(&mut net.lock(), peers.clone()); + net.lock().peer(0).client().as_client().finalize_block(finalize, None).unwrap(); + net.lock().peer(1).client().as_client().finalize_block(finalize, None).unwrap(); + net.lock().peer(2).client().as_client().finalize_block(finalize, None).unwrap(); + // verify nothing gets finalized by BEEFY + let timeout = Box::pin(tokio::time::sleep(Duration::from_millis(100))); + let pump_net = futures::future::poll_fn(|cx| { + net.lock().poll(cx); + Poll::<()>::Pending + }); + let pump_gossip = &mut charlie_gossip_engine; + let pump_with_timeout = future::select(pump_gossip, future::select(pump_net, timeout)); + streams_empty_after_future(best_blocks, Some(pump_with_timeout)).await; + streams_empty_after_timeout(versioned_finality_proof, &net, None).await; + + let (best_blocks, versioned_finality_proof) = get_beefy_streams(&mut net.lock(), peers.clone()); + // Charlie gossips finality proof for #1 -> Alice and Bob also finalize. + let proof = crate::communication::gossip::tests::dummy_proof(1, &validator_set); + let gossip_proof = GossipMessage::::FinalityProof(proof); + let encoded_proof = gossip_proof.encode(); + charlie_gossip_engine.gossip_message(proofs_topic::(), encoded_proof, true); + // Expect #1 is finalized. + wait_for_best_beefy_blocks(best_blocks, &net, &[1]).await; + wait_for_beefy_signed_commitments(versioned_finality_proof, &net, &[1]).await; + + // Code above verifies gossipped finality proofs are correctly imported and consumed by voters. + // Next, let's verify finality proofs are correctly generated and gossipped by voters. + + // Everyone finalizes #2 + let block_number = 2u64; + let finalize = hashes[block_number as usize]; + let (best_blocks, versioned_finality_proof) = get_beefy_streams(&mut net.lock(), peers.clone()); + net.lock().peer(0).client().as_client().finalize_block(finalize, None).unwrap(); + net.lock().peer(1).client().as_client().finalize_block(finalize, None).unwrap(); + net.lock().peer(2).client().as_client().finalize_block(finalize, None).unwrap(); + + // Simulate Charlie vote on #2 + let header = net.lock().peer(2).client().as_client().expect_header(finalize).unwrap(); + let mmr_root = find_mmr_root_digest::(&header).unwrap(); + let payload = Payload::from_single_entry(known_payloads::MMR_ROOT_ID, mmr_root.encode()); + let commitment = Commitment { payload, block_number, validator_set_id: validator_set.id() }; + let signature = sign_commitment(&BeefyKeyring::Charlie, &commitment); + let vote_message = VoteMessage { commitment, id: BeefyKeyring::Charlie.public(), signature }; + let encoded_vote = GossipMessage::::Vote(vote_message).encode(); + charlie_gossip_engine.gossip_message(votes_topic::(), encoded_vote, true); + + // Expect #2 is finalized. + wait_for_best_beefy_blocks(best_blocks, &net, &[2]).await; + wait_for_beefy_signed_commitments(versioned_finality_proof, &net, &[2]).await; + + // Now verify Charlie also sees the gossipped proof generated by either Alice or Bob. + let mut charlie_gossip_proofs = Box::pin( + charlie_gossip_engine + .messages_for(proofs_topic::()) + .filter_map(|notification| async move { + GossipMessage::::decode(&mut ¬ification.message[..]).ok().and_then( + |message| match message { + GossipMessage::::Vote(_) => unreachable!(), + GossipMessage::::FinalityProof(proof) => Some(proof), + }, + ) + }) + .fuse(), + ); + loop { + let pump_net = futures::future::poll_fn(|cx| { + net.lock().poll(cx); + Poll::<()>::Pending + }); + let mut gossip_engine = &mut charlie_gossip_engine; + futures::select! { + // pump gossip engine + _ = gossip_engine => unreachable!(), + // pump network + _ = pump_net.fuse() => unreachable!(), + // verify finality proof has been gossipped + proof = charlie_gossip_proofs.next() => { + let proof = proof.unwrap(); + let (round, _) = proof_block_num_and_set_id::(&proof); + match round { + 1 => continue, // finality proof generated by Charlie in the previous round + 2 => break, // finality proof generated by Alice or Bob and gossiped to Charlie + _ => panic!("Charlie got unexpected finality proof"), + } + }, + } + } +} diff --git a/client/consensus/beefy/src/worker.rs b/client/consensus/beefy/src/worker.rs index 0260d7693c654..19225ec214578 100644 --- a/client/consensus/beefy/src/worker.rs +++ b/client/consensus/beefy/src/worker.rs @@ -18,7 +18,7 @@ use crate::{ communication::{ - gossip::{topic, GossipValidator, GossipVoteFilter}, + gossip::{proofs_topic, votes_topic, GossipFilterCfg, GossipMessage, GossipValidator}, request_response::outgoing_requests_engine::OnDemandJustificationsEngine, }, error::Error, @@ -42,7 +42,7 @@ use sp_consensus_beefy::{ check_equivocation_proof, crypto::{AuthorityId, Signature}, BeefyApi, Commitment, ConsensusLog, EquivocationProof, PayloadProvider, ValidatorSet, - ValidatorSetId, VersionedFinalityProof, VoteMessage, BEEFY_ENGINE_ID, + VersionedFinalityProof, VoteMessage, BEEFY_ENGINE_ID, }; use sp_runtime::{ generic::OpaqueDigestItemId, @@ -158,8 +158,8 @@ impl VoterOracle { self.sessions.front_mut().ok_or(Error::UninitSession) } - fn current_validator_set_id(&self) -> Result { - self.active_rounds().map(|r| r.validator_set_id()) + fn current_validator_set(&self) -> Result<&ValidatorSet, Error> { + self.active_rounds().map(|r| r.validator_set()) } // Prune the sessions queue to keep the Oracle in one of the expected three states. @@ -301,10 +301,10 @@ impl PersistedState { self.voting_oracle.best_grandpa_block_header = best_grandpa; } - pub(crate) fn current_gossip_filter(&self) -> Result, Error> { + pub(crate) fn gossip_filter_config(&self) -> Result, Error> { let (start, end) = self.voting_oracle.accepted_interval()?; - let validator_set_id = self.voting_oracle.current_validator_set_id()?; - Ok(GossipVoteFilter { start, end, validator_set_id }) + let validator_set = self.voting_oracle.current_validator_set()?; + Ok(GossipFilterCfg { start, end, validator_set }) } } @@ -494,7 +494,7 @@ where // Update gossip validator votes filter. if let Err(e) = self .persisted_state - .current_gossip_filter() + .gossip_filter_config() .map(|filter| self.gossip_validator.update_filter(filter)) { error!(target: LOG_TARGET, "🥩 Voter error: {:?}", e); @@ -509,7 +509,12 @@ where ) -> Result<(), Error> { let block_num = vote.commitment.block_number; match self.voting_oracle().triage_round(block_num)? { - RoundAction::Process => self.handle_vote(vote)?, + RoundAction::Process => + if let Some(finality_proof) = self.handle_vote(vote)? { + let gossip_proof = GossipMessage::::FinalityProof(finality_proof); + let encoded_proof = gossip_proof.encode(); + self.gossip_engine.gossip_message(proofs_topic::(), encoded_proof, true); + }, RoundAction::Drop => metric_inc!(self, beefy_stale_votes), RoundAction::Enqueue => error!(target: LOG_TARGET, "🥩 unexpected vote: {:?}.", vote), }; @@ -554,7 +559,7 @@ where fn handle_vote( &mut self, vote: VoteMessage, AuthorityId, Signature>, - ) -> Result<(), Error> { + ) -> Result>, Error> { let rounds = self.persisted_state.voting_oracle.active_rounds_mut()?; let block_number = vote.commitment.block_number; @@ -567,8 +572,9 @@ where ); // We created the `finality_proof` and know to be valid. // New state is persisted after finalization. - self.finalize(finality_proof)?; + self.finalize(finality_proof.clone())?; metric_inc!(self, beefy_good_votes_processed); + return Ok(Some(finality_proof)) }, VoteImportResult::Ok => { // Persist state after handling mandatory block vote. @@ -590,7 +596,7 @@ where VoteImportResult::Invalid => metric_inc!(self, beefy_invalid_votes), VoteImportResult::Stale => metric_inc!(self, beefy_stale_votes), }; - Ok(()) + Ok(None) } /// Provide BEEFY finality for block based on `finality_proof`: @@ -643,7 +649,7 @@ where // Update gossip validator votes filter. self.persisted_state - .current_gossip_filter() + .gossip_filter_config() .map(|filter| self.gossip_validator.update_filter(filter))?; Ok(()) } @@ -758,20 +764,20 @@ where BeefyKeystore::verify(&authority_id, &signature, &encoded_commitment) ); - let message = VoteMessage { commitment, id: authority_id, signature }; - - let encoded_message = message.encode(); - - metric_inc!(self, beefy_votes_sent); - - debug!(target: LOG_TARGET, "🥩 Sent vote message: {:?}", message); - - if let Err(err) = self.handle_vote(message) { + let vote = VoteMessage { commitment, id: authority_id, signature }; + if let Some(finality_proof) = self.handle_vote(vote.clone()).map_err(|err| { error!(target: LOG_TARGET, "🥩 Error handling self vote: {}", err); + err + })? { + let encoded_proof = GossipMessage::::FinalityProof(finality_proof).encode(); + self.gossip_engine.gossip_message(proofs_topic::(), encoded_proof, true); + } else { + metric_inc!(self, beefy_votes_sent); + debug!(target: LOG_TARGET, "🥩 Sent vote message: {:?}", vote); + let encoded_vote = GossipMessage::::Vote(vote).encode(); + self.gossip_engine.gossip_message(votes_topic::(), encoded_vote, false); } - self.gossip_engine.gossip_message(topic::(), encoded_message, false); - // Persist state after vote to avoid double voting in case of voter restarts. self.persisted_state.best_voted = target_number; metric_set!(self, beefy_best_voted, target_number); @@ -816,17 +822,28 @@ where let mut votes = Box::pin( self.gossip_engine - .messages_for(topic::()) + .messages_for(votes_topic::()) .filter_map(|notification| async move { - let vote = VoteMessage::, AuthorityId, Signature>::decode( - &mut ¬ification.message[..], - ) - .ok(); + let vote = GossipMessage::::decode(&mut ¬ification.message[..]) + .ok() + .and_then(|message| message.unwrap_vote()); trace!(target: LOG_TARGET, "🥩 Got vote message: {:?}", vote); vote }) .fuse(), ); + let mut gossip_proofs = Box::pin( + self.gossip_engine + .messages_for(proofs_topic::()) + .filter_map(|notification| async move { + let proof = GossipMessage::::decode(&mut ¬ification.message[..]) + .ok() + .and_then(|message| message.unwrap_finality_proof()); + trace!(target: LOG_TARGET, "🥩 Got gossip proof message: {:?}", proof); + proof + }) + .fuse(), + ); loop { // Act on changed 'state'. @@ -872,6 +889,20 @@ where return; } }, + justif = gossip_proofs.next() => { + if let Some(justif) = justif { + // Gossiped justifications have already been verified by `GossipValidator`. + if let Err(err) = self.triage_incoming_justif(justif) { + debug!(target: LOG_TARGET, "🥩 {}", err); + } + } else { + error!( + target: LOG_TARGET, + "🥩 Finality proofs gossiping stream terminated, closing worker." + ); + return; + } + }, // Finally process incoming votes. vote = votes.next() => { if let Some(vote) = vote { @@ -880,7 +911,10 @@ where debug!(target: LOG_TARGET, "🥩 {}", err); } } else { - error!(target: LOG_TARGET, "🥩 Votes gossiping stream terminated, closing worker."); + error!( + target: LOG_TARGET, + "🥩 Votes gossiping stream terminated, closing worker." + ); return; } },