From 72419d723d497ffa7163193759b15b3a0d963cb7 Mon Sep 17 00:00:00 2001 From: Luke Parker Date: Sun, 28 Apr 2024 03:44:03 -0400 Subject: [PATCH] Rebase onto develop, which reverted this PR, and re-apply this PR --- coordinator/tributary/src/lib.rs | 34 +- coordinator/tributary/src/tendermint/mod.rs | 32 +- coordinator/tributary/tendermint/src/block.rs | 17 +- coordinator/tributary/tendermint/src/ext.rs | 2 +- coordinator/tributary/tendermint/src/lib.rs | 552 ++++-------------- .../tributary/tendermint/src/message_log.rs | 15 +- coordinator/tributary/tendermint/src/round.rs | 2 + coordinator/tributary/tendermint/tests/ext.rs | 2 +- 8 files changed, 128 insertions(+), 528 deletions(-) diff --git a/coordinator/tributary/src/lib.rs b/coordinator/tributary/src/lib.rs index dcf38c68b..121ac3859 100644 --- a/coordinator/tributary/src/lib.rs +++ b/coordinator/tributary/src/lib.rs @@ -1,5 +1,5 @@ use core::{marker::PhantomData, fmt::Debug}; -use std::{sync::Arc, io, collections::VecDeque}; +use std::{sync::Arc, io}; use async_trait::async_trait; @@ -154,14 +154,6 @@ pub struct Tributary { synced_block: Arc>>>, synced_block_result: Arc>, messages: Arc>>>, - - p2p_meta_task_handle: Arc, -} - -impl Drop for Tributary { - fn drop(&mut self) { - self.p2p_meta_task_handle.abort(); - } } impl Tributary { @@ -193,28 +185,7 @@ impl Tributary { ); let blockchain = Arc::new(RwLock::new(blockchain)); - let to_rebroadcast = Arc::new(RwLock::new(VecDeque::new())); - // Actively rebroadcast consensus messages to ensure they aren't prematurely dropped from the - // P2P layer - let p2p_meta_task_handle = Arc::new( - tokio::spawn({ - let to_rebroadcast = to_rebroadcast.clone(); - let p2p = p2p.clone(); - async move { - loop { - let to_rebroadcast = to_rebroadcast.read().await.clone(); - for msg in to_rebroadcast { - p2p.broadcast(genesis, msg).await; - } - tokio::time::sleep(core::time::Duration::from_secs(60)).await; - } - } - }) - .abort_handle(), - ); - - let network = - TendermintNetwork { genesis, signer, validators, blockchain, to_rebroadcast, p2p }; + let network = TendermintNetwork { genesis, signer, validators, blockchain, p2p }; let TendermintHandle { synced_block, synced_block_result, messages, machine } = TendermintMachine::new( @@ -235,7 +206,6 @@ impl Tributary { synced_block: Arc::new(RwLock::new(synced_block)), synced_block_result: Arc::new(RwLock::new(synced_block_result)), messages: Arc::new(RwLock::new(messages)), - p2p_meta_task_handle, }) } diff --git a/coordinator/tributary/src/tendermint/mod.rs b/coordinator/tributary/src/tendermint/mod.rs index e38efa5d3..0ce6232c9 100644 --- a/coordinator/tributary/src/tendermint/mod.rs +++ b/coordinator/tributary/src/tendermint/mod.rs @@ -1,8 +1,5 @@ use core::ops::Deref; -use std::{ - sync::Arc, - collections::{VecDeque, HashMap}, -}; +use std::{sync::Arc, collections::HashMap}; use async_trait::async_trait; @@ -270,8 +267,6 @@ pub struct TendermintNetwork { pub(crate) validators: Arc, pub(crate) blockchain: Arc>>, - pub(crate) to_rebroadcast: Arc>>>, - pub(crate) p2p: P, } @@ -308,26 +303,6 @@ impl Network for TendermintNetwork async fn broadcast(&mut self, msg: SignedMessageFor) { let mut to_broadcast = vec![TENDERMINT_MESSAGE]; to_broadcast.extend(msg.encode()); - - // Since we're broadcasting a Tendermint message, set it to be re-broadcasted every second - // until the block it's trying to build is complete - // If the P2P layer drops a message before all nodes obtained access, or a node had an - // intermittent failure, this will ensure reconcilliation - // This is atrocious if there's no content-based deduplication protocol for messages actively - // being gossiped - // LibP2p, as used by Serai, is configured to content-based deduplicate - { - let mut to_rebroadcast_lock = self.to_rebroadcast.write().await; - to_rebroadcast_lock.push_back(to_broadcast.clone()); - // We should have, ideally, 3 * validators messages within a round - // Therefore, this should keep the most recent 2-rounds - // TODO: This isn't perfect. Each participant should just rebroadcast their latest round of - // messages - while to_rebroadcast_lock.len() > (6 * self.validators.weights.len()) { - to_rebroadcast_lock.pop_front(); - } - } - self.p2p.broadcast(self.genesis, to_broadcast).await } @@ -366,7 +341,7 @@ impl Network for TendermintNetwork } } - async fn validate(&mut self, block: &Self::Block) -> Result<(), TendermintBlockError> { + async fn validate(&self, block: &Self::Block) -> Result<(), TendermintBlockError> { let block = Block::read::<&[u8]>(&mut block.0.as_ref()).map_err(|_| TendermintBlockError::Fatal)?; self @@ -428,9 +403,6 @@ impl Network for TendermintNetwork } } - // Since we've added a valid block, clear to_rebroadcast - *self.to_rebroadcast.write().await = VecDeque::new(); - Some(TendermintBlock( self.blockchain.write().await.build_block::(&self.signature_scheme()).serialize(), )) diff --git a/coordinator/tributary/tendermint/src/block.rs b/coordinator/tributary/tendermint/src/block.rs index 6dfacfdb7..236b4816b 100644 --- a/coordinator/tributary/tendermint/src/block.rs +++ b/coordinator/tributary/tendermint/src/block.rs @@ -3,7 +3,6 @@ use std::{ collections::{HashSet, HashMap}, }; -use parity_scale_codec::Encode; use serai_db::{Get, DbTxn, Db}; use crate::{ @@ -20,7 +19,7 @@ pub(crate) struct BlockData { pub(crate) number: BlockNumber, pub(crate) validator_id: Option, - pub(crate) proposal: Option, + pub(crate) our_proposal: Option, pub(crate) log: MessageLog, pub(crate) slashes: HashSet, @@ -43,7 +42,7 @@ impl BlockData { weights: Arc, number: BlockNumber, validator_id: Option, - proposal: Option, + our_proposal: Option, ) -> BlockData { BlockData { db, @@ -51,7 +50,7 @@ impl BlockData { number, validator_id, - proposal, + our_proposal, log: MessageLog::new(weights), slashes: HashSet::new(), @@ -108,17 +107,17 @@ impl BlockData { self.populate_end_time(round); } - // 11-13 + // L11-13 self.round = Some(RoundData::::new( round, time.unwrap_or_else(|| self.end_time[&RoundNumber(round.0 - 1)]), )); self.end_time.insert(round, self.round().end_time()); - // 14-21 + // L14-21 if Some(proposer) == self.validator_id { let (round, block) = self.valid.clone().unzip(); - block.or_else(|| self.proposal.clone()).map(|block| Data::Proposal(round, block)) + block.or_else(|| self.our_proposal.clone()).map(|block| Data::Proposal(round, block)) } else { self.round_mut().set_timeout(Step::Propose); None @@ -198,8 +197,8 @@ impl BlockData { assert!(!new_round); None?; } - // Put this message to the DB - txn.put(&msg_key, res.encode()); + // Put that we're sending this message to the DB + txn.put(&msg_key, []); txn.commit(); } diff --git a/coordinator/tributary/tendermint/src/ext.rs b/coordinator/tributary/tendermint/src/ext.rs index b3d568a23..3869d9d99 100644 --- a/coordinator/tributary/tendermint/src/ext.rs +++ b/coordinator/tributary/tendermint/src/ext.rs @@ -288,7 +288,7 @@ pub trait Network: Sized + Send + Sync { async fn slash(&mut self, validator: Self::ValidatorId, slash_event: SlashEvent); /// Validate a block. - async fn validate(&mut self, block: &Self::Block) -> Result<(), BlockError>; + async fn validate(&self, block: &Self::Block) -> Result<(), BlockError>; /// Add a block, returning the proposal for the next one. /// diff --git a/coordinator/tributary/tendermint/src/lib.rs b/coordinator/tributary/tendermint/src/lib.rs index 9927473ca..145c51884 100644 --- a/coordinator/tributary/tendermint/src/lib.rs +++ b/coordinator/tributary/tendermint/src/lib.rs @@ -6,7 +6,7 @@ use std::{ collections::{VecDeque, HashMap}, }; -use parity_scale_codec::{Encode, Decode}; +use parity_scale_codec::{Encode, Decode, IoReader}; use futures_channel::mpsc; use futures_util::{ @@ -15,6 +15,8 @@ use futures_util::{ }; use tokio::time::sleep; +use serai_db::{Get, DbTxn, Db}; + pub mod time; use time::{sys_time, CanonicalInstant}; @@ -30,6 +32,11 @@ pub(crate) mod message_log; pub mod ext; use ext::*; +const MESSAGE_TAPE_KEY: &[u8] = b"tendermint-machine-message_tape"; +fn message_tape_key(genesis: [u8; 32]) -> Vec { + [MESSAGE_TAPE_KEY, &genesis].concat() +} + pub fn commit_msg(end_time: u64, id: &[u8]) -> Vec { [&end_time.to_le_bytes(), id].concat() } @@ -103,9 +110,23 @@ impl SignedMessage { } } +#[derive(Clone, Copy, PartialEq, Eq, Debug, Encode, Decode)] +pub enum SlashReason { + FailToPropose, + InvalidBlock, + InvalidProposer, +} + +#[derive(Clone, PartialEq, Eq, Debug, Encode, Decode)] +pub enum Evidence { + ConflictingMessages(Vec, Vec), + InvalidPrecommit(Vec), + InvalidValidRound(Vec), +} + #[derive(Clone, PartialEq, Eq, Debug)] -pub enum TendermintError { - Malicious(N::ValidatorId, Option), +pub enum TendermintError { + Malicious, Temporal, AlreadyHandled, InvalidEvidence, @@ -126,20 +147,6 @@ pub type SignedMessageFor = SignedMessage< <::SignatureScheme as SignatureScheme>::Signature, >; -#[derive(Clone, Copy, PartialEq, Eq, Debug, Encode, Decode)] -pub enum SlashReason { - FailToPropose, - InvalidBlock, - InvalidMessage, -} - -#[derive(Clone, PartialEq, Eq, Debug, Encode, Decode)] -pub enum Evidence { - ConflictingMessages(Vec, Vec), - InvalidPrecommit(Vec), - InvalidValidRound(Vec), -} - pub fn decode_signed_message(mut data: &[u8]) -> Option> { SignedMessageFor::::decode(&mut data).ok() } @@ -147,7 +154,7 @@ pub fn decode_signed_message(mut data: &[u8]) -> Option( data: &[u8], schema: &N::SignatureScheme, -) -> Result, TendermintError> { +) -> Result, TendermintError> { let msg = decode_signed_message::(data).ok_or(TendermintError::InvalidEvidence)?; // verify that evidence messages are signed correctly @@ -162,7 +169,7 @@ pub fn verify_tendermint_evience( evidence: &Evidence, schema: &N::SignatureScheme, commit: impl Fn(u64) -> Option>, -) -> Result<(), TendermintError> { +) -> Result<(), TendermintError> { match evidence { Evidence::ConflictingMessages(first, second) => { let first = decode_and_verify_signed_message::(first, schema)?.msg; @@ -186,15 +193,16 @@ pub fn verify_tendermint_evience( }; // TODO: We need to be passed in the genesis time to handle this edge case if msg.block.0 == 0 { - todo!("invalid precommit signature on first block") + Err(TendermintError::InvalidEvidence)? + // todo!("invalid precommit signature on first block") } // get the last commit let prior_commit = match commit(msg.block.0 - 1) { Some(c) => c, - // If we have yet to sync the block in question, we will return InvalidContent based + // If we have yet to sync the block in question, we will return InvalidEvidence based // on our own temporal ambiguity - // This will also cause an InvalidContent for anything using a non-existent block, + // This will also cause an InvalidEvidence for anything using a non-existent block, // yet that's valid behavior // TODO: Double check the ramifications of this _ => Err(TendermintError::InvalidEvidence)?, @@ -229,6 +237,16 @@ pub enum SlashEvent { WithEvidence(Evidence), } +// Struct for if various upon handlers have been triggered to ensure they don't trigger multiple +// times. +#[derive(Clone, PartialEq, Eq, Debug)] +struct Upons { + upon_prevotes: bool, + upon_successful_current_round_prevotes: bool, + upon_negative_current_round_prevotes: bool, + upon_precommits: bool, +} + /// A machine executing the Tendermint protocol. pub struct TendermintMachine { db: N::Db, @@ -337,6 +355,13 @@ impl TendermintMachine { ); sleep(time_until_round_end).await; + // Clear the message tape + { + let mut txn = self.db.txn(); + txn.del(&message_tape_key(self.genesis)); + txn.commit(); + } + // Clear our outbound message queue self.queue = VecDeque::new(); @@ -838,7 +863,7 @@ impl TendermintMachine { let validators = network.signature_scheme(); let weights = Arc::new(network.weights()); let validator_id = signer.validator_id().await; - // 01-10 + // L01-10 let mut machine = TendermintMachine { db: db.clone(), genesis, @@ -888,16 +913,16 @@ impl TendermintMachine { pub async fn run(mut self) { log::debug!(target: "tendermint", "running TendermintMachine"); + let mut rebroadcast_future = Box::pin(sleep(Duration::from_secs(60))).fuse(); loop { // Also create a future for if the queue has a message // Does not pop_front as if another message has higher priority, its future will be handled // instead in this loop, and the popped value would be dropped with the next iteration - // While no other message has a higher priority right now, this is a safer practice let mut queue_future = if self.queue.is_empty() { Fuse::terminated() } else { future::ready(()).fuse() }; if let Some((our_message, msg, mut sig)) = futures_util::select_biased! { - // Handle a new block occurring externally (an external sync loop) + // Handle a new block occurring externally (from an external sync loop) // Has the highest priority as it makes all other futures here irrelevant msg = self.synced_block_recv.next() => { if let Some(SyncedBlock { number, block, commit }) = msg { @@ -931,16 +956,19 @@ impl TendermintMachine { Some((true, self.queue.pop_front().unwrap(), None)) }, + // L57-67 // Handle any timeouts step = self.block.round().timeout_future().fuse() => { // Remove the timeout so it doesn't persist, always being the selected future due to bias // While this does enable the timeout to be entered again, the timeout setting code will // never attempt to add a timeout after its timeout has expired + // (due to it setting an `upon` boolean) self.block.round_mut().timeouts.remove(&step); - // Only run if it's still the step in question - if self.block.round().step == step { - match step { - Step::Propose => { + + match step { + Step::Propose => { + // Only run if it's still the step in question + if self.block.round().step == step { // Slash the validator for not proposing when they should've log::debug!(target: "tendermint", "validator didn't propose when they should have"); // this slash will be voted on. @@ -953,14 +981,42 @@ impl TendermintMachine { ), ).await; self.broadcast(Data::Prevote(None)); - }, - Step::Prevote => self.broadcast(Data::Precommit(None)), - Step::Precommit => { - self.round(RoundNumber(self.block.round().number.0 + 1), None); - continue; } + }, + Step::Prevote => { + // Only run if it's still the step in question + if self.block.round().step == step { + self.broadcast(Data::Precommit(None)) + } + }, + Step::Precommit => { + self.round(RoundNumber(self.block.round().number.0 + 1), None); } + }; + + // Execute the upons now that the state has changed + self.all_any_round_upons(self.block.round().number).await; + self.all_current_round_upons().await; + + None + }, + + // If it's been more than 60s, rebroadcast our own messages + () = rebroadcast_future => { + let key = message_tape_key(self.genesis); + let messages = self.db.get(key).unwrap_or(vec![]); + let mut messages = messages.as_slice(); + + while !messages.is_empty() { + self.network.broadcast( + SignedMessageFor::::decode(&mut IoReader(&mut messages)) + .expect("saved invalid message to DB") + ).await; } + + // Reset the rebroadcast future + rebroadcast_future = Box::pin(sleep(core::time::Duration::from_secs(60))).fuse(); + None }, @@ -982,429 +1038,31 @@ impl TendermintMachine { } let sig = sig.unwrap(); - // TODO: message may internally call broadcast. We should check within broadcast it's not - // broadcasting our own message at this time. let signed_msg = SignedMessage { msg: msg.clone(), sig: sig.clone() }; let res = self.message(&signed_msg).await; + // If this is our message, and we hit an invariant, we could be slashed. + // We only broadcast our message after running it ourselves, to ensure it doesn't error, to + // ensure we don't get slashed on invariants. if res.is_err() && our_message { panic!("honest node (ourselves) had invalid behavior"); } - // Only now should we allow broadcasts since we're sure an invariant wasn't reached causing - // us to have invalid messages. - - if res.is_ok() { - // Re-broadcast this since it's an original consensus message - self.network.broadcast(signed_msg).await; - } - - match res { - Ok(None) => {} - Ok(Some(block)) => { - let mut validators = vec![]; - let mut sigs = vec![]; - // Get all precommits for this round - for (validator, msgs) in &self.block.log.log[&msg.round] { - if let Some(signed) = msgs.get(&Step::Precommit) { - if let Data::Precommit(Some((id, sig))) = &signed.msg.data { - // If this precommit was for this block, include it - if *id == block.id() { - validators.push(*validator); - sigs.push(sig.clone()); - } - } - } - } - - let commit_msg = - commit_msg(self.block.end_time[&msg.round].canonical(), block.id().as_ref()); - let commit = Commit { - end_time: self.block.end_time[&msg.round].canonical(), - validators: validators.clone(), - signature: self.network.signature_scheme().aggregate(&validators, &commit_msg, &sigs), - }; - debug_assert!(self.network.verify_commit(block.id(), &commit)); - - log::info!( - target: "tendermint", - "TendermintMachine produced block {}", - hex::encode(block.id().as_ref()), - ); - let id = block.id(); - let proposal = self.network.add_block(block, commit).await; - log::trace!( - target: "tendermint", - "added block {} (produced by machine)", - hex::encode(id.as_ref()), - ); - self.reset(msg.round, proposal).await; - } - Err(TendermintError::Malicious(sender, evidence)) => { - let current_msg = SignedMessage { msg: msg.clone(), sig: sig.clone() }; - - let slash = if let Some(ev) = evidence { - // if the malicious message contains a block, only vote to slash - // TODO: Should this decision be made at a higher level? - // A higher-level system may be able to verify if the contained block is fatally - // invalid - // A higher-level system may accept the bandwidth size of this, even if the issue is - // just the valid round field - if let Data::Proposal(_, _) = ¤t_msg.msg.data { - SlashEvent::Id( - SlashReason::InvalidBlock, - self.block.number.0, - self.block.round().number.0, - ) - } else { - // slash with evidence otherwise - SlashEvent::WithEvidence(ev) - } - } else { - // we don't have evidence. Slash with vote. - SlashEvent::Id( - SlashReason::InvalidMessage, - self.block.number.0, - self.block.round().number.0, - ) - }; - - // Each message that we're voting to slash over needs to be re-broadcasted so other - // validators also trigger their own votes - // TODO: should this be inside slash function? - if let SlashEvent::Id(_, _, _) = slash { - self.network.broadcast(current_msg).await; - } - - self.slash(sender, slash).await - } - Err( - TendermintError::Temporal | - TendermintError::AlreadyHandled | - TendermintError::InvalidEvidence, - ) => (), - } - } - } - } - - // Returns Ok(true) if this was a Precommit which had either no signature or its signature - // validated - // Returns Ok(false) if it wasn't a Precommit or the signature wasn't validated yet - // Returns Err if the signature was invalid - fn verify_precommit_signature( - &self, - signed: &SignedMessageFor, - ) -> Result> { - let msg = &signed.msg; - if let Data::Precommit(precommit) = &msg.data { - let Some((id, sig)) = precommit else { return Ok(true) }; - // Also verify the end_time of the commit - // Only perform this verification if we already have the end_time - // Else, there's a DoS where we receive a precommit for some round infinitely in the future - // which forces us to calculate every end time - if let Some(end_time) = self.block.end_time.get(&msg.round) { - if !self.validators.verify(msg.sender, &commit_msg(end_time.canonical(), id.as_ref()), sig) - { - log::warn!(target: "tendermint", "Validator produced an invalid commit signature"); - Err(TendermintError::Malicious( - msg.sender, - Some(Evidence::InvalidPrecommit(signed.encode())), - ))?; - } - return Ok(true); - } - } - Ok(false) - } - - async fn message( - &mut self, - signed: &SignedMessageFor, - ) -> Result, TendermintError> { - let msg = &signed.msg; - if msg.block != self.block.number { - Err(TendermintError::Temporal)?; - } - if (msg.block == self.block.number) && - (msg.round == self.block.round().number) && - (msg.data.step() == Step::Propose) - { - log::trace!( - target: "tendermint", - "received Propose for block {}, round {}", - msg.block.0, - msg.round.0, - ); - } - - // If this is a precommit, verify its signature - self.verify_precommit_signature(signed)?; - - // Only let the proposer propose - if matches!(msg.data, Data::Proposal(..)) && - (msg.sender != self.weights.proposer(msg.block, msg.round)) - { - log::warn!(target: "tendermint", "Validator who wasn't the proposer proposed"); - // TODO: This should have evidence - Err(TendermintError::Malicious(msg.sender, None))?; - }; - - if !self.block.log.log(signed.clone())? { - return Err(TendermintError::AlreadyHandled); - } - log::trace!( - target: "tendermint", - "received new tendermint message (block: {}, round: {}, step: {:?})", - msg.block.0, - msg.round.0, - msg.data.step(), - ); - - // All functions, except for the finalizer and the jump, are locked to the current round - - // Run the finalizer to see if it applies - // 49-52 - if matches!(msg.data, Data::Proposal(..)) || matches!(msg.data, Data::Precommit(_)) { - let proposer = self.weights.proposer(self.block.number, msg.round); - - // Get the proposal - if let Some(proposal_signed) = self.block.log.get(msg.round, proposer, Step::Propose) { - if let Data::Proposal(_, block) = &proposal_signed.msg.data { - // Check if it has gotten a sufficient amount of precommits - // Uses a junk signature since message equality disregards the signature - if self.block.log.has_consensus( - msg.round, - &Data::Precommit(Some((block.id(), self.signer.sign(&[]).await))), - ) { - // If msg.round is in the future, these Precommits won't have their inner signatures - // verified - // It should be impossible for msg.round to be in the future however, as this requires - // 67% of validators to Precommit, and we jump on 34% participating in the new round - // The one exception would be if a validator had 34%, and could cause participation to - // go from 33% (not enough to jump) to 67%, without executing the below code - // This also would require the local machine to be outside of allowed time tolerances, - // or the validator with 34% to not be publishing Prevotes (as those would cause a - // a jump) - // Both are invariants - // TODO: Replace this panic with an inner signature check - assert!(msg.round.0 <= self.block.round().number.0); - - log::debug!(target: "tendermint", "block {} has consensus", msg.block.0); - return Ok(Some(block.clone())); - } - } - } - } - - // Else, check if we need to jump ahead - #[allow(clippy::comparison_chain)] - if msg.round.0 < self.block.round().number.0 { - // Prior round, disregard if not finalizing - return Ok(None); - } else if msg.round.0 > self.block.round().number.0 { - // 55-56 - // Jump, enabling processing by the below code - if self.block.log.round_participation(msg.round) > self.weights.fault_threshold() { - log::debug!( - target: "tendermint", - "jumping from round {} to round {}", - self.block.round().number.0, - msg.round.0, - ); - - // Jump to the new round. - let proposer = self.round(msg.round, None); - - // If this round already has precommit messages, verify their signatures - let round_msgs = self.block.log.log[&msg.round].clone(); - for (validator, msgs) in &round_msgs { - if let Some(existing) = msgs.get(&Step::Precommit) { - if let Ok(res) = self.verify_precommit_signature(existing) { - // Ensure this actually verified the signature instead of believing it shouldn't yet - assert!(res); - } else { - // Remove the message so it isn't counted towards forming a commit/included in one - // This won't remove the fact they precommitted for this block hash in the MessageLog - // TODO: Don't even log these in the first place until we jump, preventing needing - // to do this in the first place - let msg = self - .block - .log - .log - .get_mut(&msg.round) - .unwrap() - .get_mut(validator) - .unwrap() - .remove(&Step::Precommit) - .unwrap(); - - // Slash the validator for publishing an invalid commit signature - self - .slash( - *validator, - SlashEvent::WithEvidence(Evidence::InvalidPrecommit(msg.encode())), - ) - .await; - } - } - } - - // If we're the proposer, return now we don't waste time on the current round - // (as it doesn't have a proposal, since we didn't propose, and cannot complete) - if proposer { - return Ok(None); - } - } else { - // Future round which we aren't ready to jump to, so return for now - return Ok(None); - } - } - - // msg.round is now guaranteed to be equal to self.block.round().number - debug_assert_eq!(msg.round, self.block.round().number); - - // The paper executes these checks when the step is prevote. Making sure this message warrants - // rerunning these checks is a sane optimization since message instances is a full iteration - // of the round map - if (self.block.round().step == Step::Prevote) && matches!(msg.data, Data::Prevote(_)) { - let (participation, weight) = - self.block.log.message_instances(self.block.round().number, &Data::Prevote(None)); - let threshold_weight = self.weights.threshold(); - if participation < threshold_weight { - log::trace!( - target: "tendermint", - "progess towards setting prevote timeout, participation: {}, needed: {}", - participation, - threshold_weight, - ); - } - // 34-35 - if participation >= threshold_weight { - log::trace!( - target: "tendermint", - "setting timeout for prevote due to sufficient participation", - ); - self.block.round_mut().set_timeout(Step::Prevote); - } - - // 44-46 - if weight >= threshold_weight { - self.broadcast(Data::Precommit(None)); - return Ok(None); - } - } - - // 47-48 - if matches!(msg.data, Data::Precommit(_)) && - self.block.log.has_participation(self.block.round().number, Step::Precommit) - { - log::trace!( - target: "tendermint", - "setting timeout for precommit due to sufficient participation", - ); - self.block.round_mut().set_timeout(Step::Precommit); - } - - // All further operations require actually having the proposal in question - let proposer = self.weights.proposer(self.block.number, self.block.round().number); - let (vr, block) = if let Some(proposal_signed) = - self.block.log.get(self.block.round().number, proposer, Step::Propose) - { - if let Data::Proposal(vr, block) = &proposal_signed.msg.data { - (vr, block) - } else { - panic!("message for Step::Propose didn't have Data::Proposal"); - } - } else { - return Ok(None); - }; - - // 22-33 - if self.block.round().step == Step::Propose { - // Delay error handling (triggering a slash) until after we vote. - let (valid, err) = match self.network.validate(block).await { - Ok(()) => (true, Ok(None)), - Err(BlockError::Temporal) => (false, Ok(None)), - Err(BlockError::Fatal) => (false, { - log::warn!(target: "tendermint", "Validator proposed a fatally invalid block"); - // TODO: Produce evidence of this for the higher level code to decide what to do with - Err(TendermintError::Malicious(proposer, None)) - }), - }; - // Create a raw vote which only requires block validity as a basis for the actual vote. - let raw_vote = Some(block.id()).filter(|_| valid); - - // If locked is none, it has a round of -1 according to the protocol. That satisfies - // 23 and 29. If it's some, both are satisfied if they're for the same ID. If it's some - // with different IDs, the function on 22 rejects yet the function on 28 has one other - // condition - let locked = self.block.locked.as_ref().map_or(true, |(_, id)| id == &block.id()); - let mut vote = raw_vote.filter(|_| locked); - - if let Some(vr) = vr { - // Malformed message - if vr.0 >= self.block.round().number.0 { - log::warn!(target: "tendermint", "Validator claimed a round from the future was valid"); - Err(TendermintError::Malicious( - msg.sender, - Some(Evidence::InvalidValidRound(signed.encode())), - ))?; - } - - if self.block.log.has_consensus(*vr, &Data::Prevote(Some(block.id()))) { - // Allow differing locked values if the proposal has a newer valid round - // This is the other condition described above - if let Some((locked_round, _)) = self.block.locked.as_ref() { - vote = vote.or_else(|| raw_vote.filter(|_| locked_round.0 <= vr.0)); - } - - self.broadcast(Data::Prevote(vote)); - return err; + // Save this message to a linear tape of all our messages for this block, if ours + // TODO: Since we do this after we mark this message as sent to prevent equivocations, a + // precisely time reboot could cause this message marked as sent yet not added to the tape + if our_message { + let message_tape_key = message_tape_key(self.genesis); + let mut txn = self.db.txn(); + let mut message_tape = txn.get(&message_tape_key).unwrap_or(vec![]); + message_tape.extend(signed_msg.encode()); + txn.put(&message_tape_key, message_tape); } - } else { - self.broadcast(Data::Prevote(vote)); - return err; - } - - return Ok(None); - } - if self.block.valid.as_ref().map_or(true, |(round, _)| round != &self.block.round().number) { - // 36-43 - - // The run once condition is implemented above. Since valid will always be set by this, it - // not being set, or only being set historically, means this has yet to be run - - if self.block.log.has_consensus(self.block.round().number, &Data::Prevote(Some(block.id()))) { - match self.network.validate(block).await { - // BlockError::Temporal is due to a temporal error we have, yet a supermajority of the - // network does not, Because we do not believe this block to be fatally invalid, and - // because a supermajority deems it valid, accept it. - Ok(()) | Err(BlockError::Temporal) => (), - Err(BlockError::Fatal) => { - log::warn!(target: "tendermint", "Validator proposed a fatally invalid block"); - // TODO: Produce evidence of this for the higher level code to decide what to do with - Err(TendermintError::Malicious(proposer, None))? - } - }; - - self.block.valid = Some((self.block.round().number, block.clone())); - if self.block.round().step == Step::Prevote { - self.block.locked = Some((self.block.round().number, block.id())); - self.broadcast(Data::Precommit(Some(( - block.id(), - self - .signer - .sign(&commit_msg( - self.block.end_time[&self.block.round().number].canonical(), - block.id().as_ref(), - )) - .await, - )))); + // Re-broadcast this since it's an original consensus message worth handling + if res.is_ok() { + self.network.broadcast(signed_msg).await; } } } - - Ok(None) } } diff --git a/coordinator/tributary/tendermint/src/message_log.rs b/coordinator/tributary/tendermint/src/message_log.rs index a150617be..716f6d64e 100644 --- a/coordinator/tributary/tendermint/src/message_log.rs +++ b/coordinator/tributary/tendermint/src/message_log.rs @@ -2,7 +2,7 @@ use std::{sync::Arc, collections::HashMap}; use parity_scale_codec::Encode; -use crate::{ext::*, RoundNumber, Step, DataFor, TendermintError, SignedMessageFor, Evidence}; +use crate::{ext::*, RoundNumber, Step, DataFor, SignedMessageFor, Evidence}; type RoundLog = HashMap<::ValidatorId, HashMap>>; pub(crate) struct MessageLog { @@ -16,7 +16,7 @@ impl MessageLog { } // Returns true if it's a new message - pub(crate) fn log(&mut self, signed: SignedMessageFor) -> Result> { + pub(crate) fn log(&mut self, signed: SignedMessageFor) -> Result { let msg = &signed.msg; // Clarity, and safety around default != new edge cases let round = self.log.entry(msg.round).or_insert_with(HashMap::new); @@ -30,10 +30,7 @@ impl MessageLog { target: "tendermint", "Validator sent multiple messages for the same block + round + step" ); - Err(TendermintError::Malicious( - msg.sender, - Some(Evidence::ConflictingMessages(existing.encode(), signed.encode())), - ))?; + Err(Evidence::ConflictingMessages(existing.encode(), signed.encode()))?; } return Ok(false); } @@ -47,7 +44,8 @@ impl MessageLog { pub(crate) fn message_instances(&self, round: RoundNumber, data: &DataFor) -> (u64, u64) { let mut participating = 0; let mut weight = 0; - for (participant, msgs) in &self.log[&round] { + let Some(log) = self.log.get(&round) else { return (0, 0) }; + for (participant, msgs) in log { if let Some(msg) = msgs.get(&data.step()) { let validator_weight = self.weights.weight(*participant); participating += validator_weight; @@ -73,7 +71,8 @@ impl MessageLog { // Check if a supermajority of nodes have participated on a specific step pub(crate) fn has_participation(&self, round: RoundNumber, step: Step) -> bool { let mut participating = 0; - for (participant, msgs) in &self.log[&round] { + let Some(log) = self.log.get(&round) else { return false }; + for (participant, msgs) in log { if msgs.get(&step).is_some() { participating += self.weights.weight(*participant); } diff --git a/coordinator/tributary/tendermint/src/round.rs b/coordinator/tributary/tendermint/src/round.rs index 445c27848..a97e3ed1e 100644 --- a/coordinator/tributary/tendermint/src/round.rs +++ b/coordinator/tributary/tendermint/src/round.rs @@ -57,6 +57,7 @@ impl RoundData { // Poll all set timeouts, returning the Step whose timeout has just expired pub(crate) async fn timeout_future(&self) -> Step { + /* let now = Instant::now(); log::trace!( target: "tendermint", @@ -64,6 +65,7 @@ impl RoundData { self.step, self.timeouts.iter().map(|(k, v)| (k, v.duration_since(now))).collect::>() ); + */ let timeout_future = |step| { let timeout = self.timeouts.get(&step).copied(); diff --git a/coordinator/tributary/tendermint/tests/ext.rs b/coordinator/tributary/tendermint/tests/ext.rs index 3b3cf7c3b..bec95ddcd 100644 --- a/coordinator/tributary/tendermint/tests/ext.rs +++ b/coordinator/tributary/tendermint/tests/ext.rs @@ -145,7 +145,7 @@ impl Network for TestNetwork { println!("Slash for {id} due to {event:?}"); } - async fn validate(&mut self, block: &TestBlock) -> Result<(), BlockError> { + async fn validate(&self, block: &TestBlock) -> Result<(), BlockError> { block.valid }