diff --git a/febft-pbft-consensus/Cargo.toml b/febft-pbft-consensus/Cargo.toml index 3575a48a..59c377c5 100644 --- a/febft-pbft-consensus/Cargo.toml +++ b/febft-pbft-consensus/Cargo.toml @@ -23,69 +23,34 @@ serialize_capnp = ["atlas-capnp"] bincode = "2.0.0-rc.3" num_cpus = "1" mimalloc = { version = "*", default-features = false } -rand = {version = "0.8.5", features = ["small_rng"] } +rand = { version = "0.8.5", features = ["small_rng"] } +rand_core = "0" [dependencies] anyhow = "1.0" thiserror = "1.0" +chrono = "0" getset = "0.1.2" +lazy_static = "1" atlas-common = { path = "../../Atlas/Atlas-Common", default-features = true } atlas-communication = { path = "../../Atlas/Atlas-Communication" } atlas-core = { path = "../../Atlas/Atlas-Core" } atlas-capnp = { path = "../../Atlas/Atlas-capnp", optional = true } -atlas-metrics = {path = "../../Atlas/Atlas-Metrics"} +atlas-metrics = { path = "../../Atlas/Atlas-Metrics" } -capnp = { version = "0.16" } -fastrand = "1.7.0" -bytes = "1.4.0" -chrono = "0.4" -intmap = "2.0.0" +intmap = "2" either = "1" -oneshot = "0.1" -futures = "0.3.21" -futures-timer = "3" -async-tls = "0.12.0" -rustls = "0.20.6" -webpki = "0.22.0" -parking_lot = "0.12.1" -dashmap = "5.1.0" -thread_local = "1.1.4" -num_cpus = "1" -socket2 = "0.4" -event-listener = "2.5.2" -linked-hash-map = "0.5" -rand_core = { version = "0.6", features = ["getrandom"] } -smallvec = { version = "1", features = ["union", "write", "const_generics"] } -async-std = { version = "1", optional = true } -tokio = { version = "1.25.0", features = ["full"], optional = true } -tokio-util = { version = "0.7.1", features = ["compat"], optional = true } -tokio-metrics = { version = "0.1.0", optional = true } -ring = { version = "0.17.8", optional = true } -threadpool-crossbeam-channel = { version = "1.8.0", optional = true } #async-semaphore = { version = "1", optional = true } -serde = { version = "*", features = ["derive", "rc"]} +serde = { version = "*", features = ["derive", "rc"] } +serde_bytes = { version = "0", optional = true } bincode = { version = "2.0.0-rc.3", features = ["serde"], optional = true } -flume = { version = "0.10", optional = true } -async-channel = { version = "1", optional = true } -twox-hash = { version = "1", optional = true } -serde_bytes = { version = "0.11", optional = true } -fxhash = { version = "0.2", optional = true } -dsrust = { version = "0.1.9", git = "https://github.com/nuno1212s/DSRust", optional = true } -#dsrust = { path = "/home/nunogneto/Documents/Development/Rust/dsrust" } -crossbeam-channel = { version = "0.5.2", optional = true } -crossbeam-skiplist = "0.1.1" -rocksdb = { version = "0.20.1", optional = true } - -log = "0.4.17" -env_logger = "0.10.0" -log4rs = { version = "1.1.1", features = ["file_appender"] } +tracing = "*" #tracing = "0.1.32" #tracing-subscriber = { version = "0.3.11", features = ["fmt"] } -num-bigint = "0.4.3" -num-traits = "0.2.15" - - +num-bigint = "*" +num-traits = "*" +event-listener = "*" diff --git a/febft-pbft-consensus/src/bft/consensus/accessory/replica/mod.rs b/febft-pbft-consensus/src/bft/consensus/accessory/replica/mod.rs index e0a2dd43..bd8a331c 100644 --- a/febft-pbft-consensus/src/bft/consensus/accessory/replica/mod.rs +++ b/febft-pbft-consensus/src/bft/consensus/accessory/replica/mod.rs @@ -1,8 +1,9 @@ use std::collections::BTreeMap; use std::sync::{Arc, Mutex}; - +use std::time::SystemTime; use chrono::Utc; -use log::debug; + +use tracing::debug; use atlas_common::node_id::NodeId; use atlas_common::ordering::{Orderable, SeqNo}; @@ -22,15 +23,15 @@ use crate::bft::sync::view::ViewInfo; use crate::bft::{SysMsg, PBFT}; pub struct ReplicaAccessory - where - RQ: SerType, +where + RQ: SerType, { speculative_commits: Arc>>>>, } impl AccessoryConsensus for ReplicaAccessory - where - RQ: SerType + 'static, +where + RQ: SerType + 'static, { fn handle_partial_pre_prepare( &mut self, @@ -41,7 +42,8 @@ impl AccessoryConsensus for ReplicaAccessory _node: &NT, ) where NT: OrderProtocolSendNode>, - {} + { + } fn handle_pre_prepare_phase_completed( &mut self, @@ -94,7 +96,7 @@ impl AccessoryConsensus for ReplicaAccessory Some(digest), Some(&*key_pair), ) - .into_inner(); + .into_inner(); // store serialized header + message let serialized = SerializedMessage::new(message.clone(), buf.clone()); @@ -138,7 +140,8 @@ impl AccessoryConsensus for ReplicaAccessory _node: &NT, ) where NT: OrderProtocolSendNode>, - {} + { + } fn handle_preparing_quorum( &mut self, @@ -201,7 +204,8 @@ impl AccessoryConsensus for ReplicaAccessory _node: &NT, ) where NT: OrderProtocolSendNode>, - {} + { + } fn handle_committing_quorum( &mut self, @@ -212,12 +216,13 @@ impl AccessoryConsensus for ReplicaAccessory _node: &NT, ) where NT: OrderProtocolSendNode>, - {} + { + } } impl Default for ReplicaAccessory - where - RQ: SerType, +where + RQ: SerType, { fn default() -> Self { Self::new() @@ -225,8 +230,8 @@ impl Default for ReplicaAccessory } impl ReplicaAccessory - where - RQ: SerType, +where + RQ: SerType, { pub fn new() -> Self { Self { @@ -247,8 +252,8 @@ fn valid_spec_commits( seq_no: SeqNo, view: &ViewInfo, ) -> bool - where - RQ: SerType, +where + RQ: SerType, { let len = speculative_commits.len(); diff --git a/febft-pbft-consensus/src/bft/consensus/decision/mod.rs b/febft-pbft-consensus/src/bft/consensus/decision/mod.rs index f84e7d26..bd894088 100644 --- a/febft-pbft-consensus/src/bft/consensus/decision/mod.rs +++ b/febft-pbft-consensus/src/bft/consensus/decision/mod.rs @@ -5,7 +5,7 @@ use std::time::Instant; use atlas_common::Err; use chrono::Utc; -use log::{debug, info, warn}; +use tracing::{debug, info, instrument, warn}; use thiserror::Error; use atlas_common::error::*; @@ -16,7 +16,7 @@ use atlas_communication::message::Header; use atlas_core::messages::{ClientRqInfo, SessionBased}; use atlas_core::ordering_protocol::networking::OrderProtocolSendNode; use atlas_core::ordering_protocol::ShareableMessage; -use atlas_core::timeouts::Timeouts; +use atlas_core::timeouts::timeout::TimeoutModHandle; use atlas_metrics::metrics::metric_duration; use crate::bft::consensus::accessory::replica::ReplicaAccessory; @@ -223,6 +223,7 @@ where } } + #[instrument(skip(self), level = "debug")] pub fn poll(&mut self) -> DecisionPollStatus { match self.phase { DecisionPhase::Initialize => { @@ -271,11 +272,12 @@ where } /// Process a message relating to this consensus instance + #[instrument(skip(self, synchronizer, timeouts, node), level = "debug")] pub fn process_message( &mut self, s_message: ShareableMessage>, synchronizer: &Synchronizer, - timeouts: &Timeouts, + timeouts: &TimeoutModHandle, node: &Arc, ) -> Result> where @@ -665,6 +667,7 @@ where } /// Finalize this consensus decision and return the information about the batch + #[instrument(skip(self), level = "debug")] pub fn finalize(self) -> Result> { if let DecisionPhase::Decided = self.phase { let seq = self.sequence_number(); @@ -703,7 +706,7 @@ where fn request_batch_received( header: &Header, pre_prepare: &ConsensusMessage, - timeouts: &Timeouts, + timeouts: &TimeoutModHandle, synchronizer: &Synchronizer, log: &WorkingDecisionLog, ) -> Vec diff --git a/febft-pbft-consensus/src/bft/consensus/mod.rs b/febft-pbft-consensus/src/bft/consensus/mod.rs index 7fcb1007..5b18e5af 100644 --- a/febft-pbft-consensus/src/bft/consensus/mod.rs +++ b/febft-pbft-consensus/src/bft/consensus/mod.rs @@ -2,10 +2,11 @@ use std::cmp::Reverse; use std::collections::{BTreeMap, BTreeSet, BinaryHeap, VecDeque}; use std::sync::atomic::{AtomicBool, Ordering}; use std::sync::{Arc, Mutex}; +use std::fmt::{Debug, Formatter}; use either::Either; -use event_listener::Event; -use log::{debug, error, info, trace, warn}; +use event_listener::{Event, Listener}; +use tracing::{debug, error, info, trace, warn, instrument}; use atlas_common::error::*; use atlas_common::globals::ReadOnly; @@ -20,7 +21,7 @@ use atlas_communication::message::{Header, StoredMessage}; use atlas_core::messages::{ClientRqInfo, SessionBased}; use atlas_core::ordering_protocol::networking::OrderProtocolSendNode; use atlas_core::ordering_protocol::{Decision, ShareableMessage}; -use atlas_core::timeouts::Timeouts; +use atlas_core::timeouts::timeout::TimeoutModHandle; use atlas_metrics::metrics::metric_increment; use crate::bft::consensus::decision::{ @@ -59,7 +60,6 @@ pub enum ConsensusStatus { Decided(MaybeVec>), } -#[derive(Debug)] /// Represents the status of calling `poll()` on a `Consensus`. pub enum ConsensusPollStatus { /// The `Replica` associated with this `Consensus` should @@ -119,12 +119,10 @@ impl TboQueue { fn advance_queue(&mut self) -> MessageQueue { self.curr_seq = self.curr_seq.next(); - let pre_prepares = tbo_advance_message_queue_return(&mut self.pre_prepares) - .unwrap_or_default(); - let prepares = - tbo_advance_message_queue_return(&mut self.prepares).unwrap_or_default(); - let commits = - tbo_advance_message_queue_return(&mut self.commits).unwrap_or_default(); + let pre_prepares = + tbo_advance_message_queue_return(&mut self.pre_prepares).unwrap_or_default(); + let prepares = tbo_advance_message_queue_return(&mut self.prepares).unwrap_or_default(); + let commits = tbo_advance_message_queue_return(&mut self.commits).unwrap_or_default(); MessageQueue::from_messages(pre_prepares, prepares, commits) } @@ -226,7 +224,7 @@ where /// for each consensus instance consensus_guard: Arc, /// A reference to the timeouts - timeouts: Timeouts, + timeouts: TimeoutModHandle, /// Check if we are currently recovering from a fault, meaning we should ignore timeouts is_recovering: bool, } @@ -241,7 +239,7 @@ where seq_no: SeqNo, watermark: u32, consensus_guard: Arc, - timeouts: Timeouts, + timeouts: TimeoutModHandle, ) -> Self { let mut curr_seq = seq_no; @@ -272,6 +270,7 @@ where } /// Queue a given message into our message queues. + #[instrument(skip(self), level = "debug")] pub fn queue(&mut self, message: ShareableMessage>) { let message_seq = message.message().sequence_number(); @@ -325,8 +324,9 @@ where self.signalled.push_signalled(message_seq); } } - + /// Poll the given consensus + #[instrument(skip_all, level = "debug", ret)] pub fn poll(&mut self) -> ConsensusPollStatus { trace!("Current signal queue: {:?}", self.signalled); @@ -378,11 +378,12 @@ where ConsensusPollStatus::Recv } + #[instrument(skip(self, synchronizer, timeouts, node), level = "debug")] pub fn process_message( &mut self, s_message: ShareableMessage>, synchronizer: &Synchronizer, - timeouts: &Timeouts, + timeouts: &TimeoutModHandle, node: &Arc, ) -> Result> where @@ -506,6 +507,7 @@ where } /// Finalize the next consensus instance if possible + #[instrument(skip(self), level = "debug")] pub fn finalize(&mut self, view: &ViewInfo) -> Result>> { // If the decision can't be finalized, then we can't finalize the batch if let Some(decision) = self.decisions.front() { @@ -537,6 +539,7 @@ where /// Advance to the next instance of the consensus /// This will also create the necessary new decision to keep the pending decisions /// equal to the water mark + #[instrument(skip(self), level = "debug")] pub fn next_instance(&mut self, view: &ViewInfo) -> ConsensusDecision { let decision = self.decisions.pop_front().unwrap(); @@ -550,7 +553,7 @@ where self.is_recovering = false; // This means the queue is empty. - self.timeouts.cancel_client_rq_timeouts(None); + let _ = self.timeouts.cancel_all_timeouts(); } let new_seq_no = self @@ -569,6 +572,7 @@ where decision } + #[instrument(skip(self), level = "debug")] pub fn install_sequence_number(&mut self, novel_seq_no: SeqNo, view: &ViewInfo) { info!( "{:?} // Installing sequence number {:?} vs current {:?}", @@ -719,6 +723,7 @@ where } /// Catch up to the quorums latest decided consensus + #[instrument(skip(self, proof, log), level = "debug", fields(proof_seq = proof.sequence_number().into_u32()))] pub fn catch_up_to_quorum( &mut self, view: &ViewInfo, @@ -737,6 +742,7 @@ where /// Create a fake `PRE-PREPARE`. This is useful during the view /// change protocol. + #[instrument(skip(self, requests), level = "debug", fields(request_count = requests.len()))] pub fn forge_propose(&self, requests: Vec>, view: &ViewInfo) -> SysMsg { PBFTMessage::Consensus(ConsensusMessage::new( self.sequence_number(), @@ -746,6 +752,7 @@ where } /// Install a given view into the current consensus decisions. + #[instrument(skip(self), level = "debug")] pub fn install_view(&mut self, view: &ViewInfo) { let view_index = match view .sequence_number() @@ -832,12 +839,13 @@ where } /// Finalize the view change protocol + #[instrument(skip(self, synchronizer, timeouts, node, _log), level = "debug")] pub fn finalize_view_change( &mut self, (header, message): (Header, ConsensusMessage), new_view: &ViewInfo, synchronizer: &Synchronizer, - timeouts: &Timeouts, + timeouts: &TimeoutModHandle, _log: &mut Log, node: &Arc, ) -> Result> @@ -1103,3 +1111,19 @@ impl Signals { self.signaled_seq_no.clear(); } } + +impl Debug for ConsensusPollStatus { + fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result { + match self { + ConsensusPollStatus::Recv => { + write!(f, "Receive messages") + } + ConsensusPollStatus::NextMessage(_) => { + write!(f, "Message Ready") + } + ConsensusPollStatus::Decided(_) => { + write!(f, "Decided instance") + } + } + } +} \ No newline at end of file diff --git a/febft-pbft-consensus/src/bft/log/deciding/mod.rs b/febft-pbft-consensus/src/bft/log/deciding/mod.rs index 5791abad..3d488ebf 100644 --- a/febft-pbft-consensus/src/bft/log/deciding/mod.rs +++ b/febft-pbft-consensus/src/bft/log/deciding/mod.rs @@ -454,11 +454,7 @@ pub fn pre_prepare_index_from_digest_opt( None => { Err!(DecidingLogError::PrePrepareNotPartOfSet( *digest, - prepare_set - .iter() - .cloned() - .flatten() - .collect() + prepare_set.iter().cloned().flatten().collect() )) } Some(pos) => Ok(pos), diff --git a/febft-pbft-consensus/src/bft/log/mod.rs b/febft-pbft-consensus/src/bft/log/mod.rs index d2777e8d..1d703692 100644 --- a/febft-pbft-consensus/src/bft/log/mod.rs +++ b/febft-pbft-consensus/src/bft/log/mod.rs @@ -84,12 +84,7 @@ where batch_meta: _, } = completed; - let metadata = ProofMetadata::new( - seq, - digest, - pre_prepare_ordering, - client_requests.len(), - ); + let metadata = ProofMetadata::new(seq, digest, pre_prepare_ordering, client_requests.len()); let FinishedMessageLog { pre_prepares, diff --git a/febft-pbft-consensus/src/bft/message/mod.rs b/febft-pbft-consensus/src/bft/message/mod.rs index 2833b6c5..bf7208f3 100644 --- a/febft-pbft-consensus/src/bft/message/mod.rs +++ b/febft-pbft-consensus/src/bft/message/mod.rs @@ -4,7 +4,6 @@ use std::fmt::{Debug, Formatter}; use std::io::Write; - use getset::Getters; #[cfg(feature = "serialize_serde")] use serde::{Deserialize, Serialize}; diff --git a/febft-pbft-consensus/src/bft/message/serialize/mod.rs b/febft-pbft-consensus/src/bft/message/serialize/mod.rs index ba3d3a08..b8e974c4 100644 --- a/febft-pbft-consensus/src/bft/message/serialize/mod.rs +++ b/febft-pbft-consensus/src/bft/message/serialize/mod.rs @@ -10,13 +10,10 @@ use std::io::{Read, Write}; use std::marker::PhantomData; use std::sync::Arc; - -use bytes::Bytes; - use atlas_common::error::*; use atlas_common::ordering::Orderable; use atlas_common::serialization_helper::SerType; -use atlas_communication::message::{Header}; +use atlas_communication::message::Header; use atlas_communication::reconfiguration::NetworkInformationProvider; use atlas_core::ordering_protocol::loggable::PersistentOrderProtocolTypes; use atlas_core::ordering_protocol::networking::serialize::{ @@ -25,21 +22,16 @@ use atlas_core::ordering_protocol::networking::serialize::{ use crate::bft::log::decisions::{Proof, ProofMetadata}; use crate::bft::message::{ - ConsensusMessage, ConsensusMessageKind, PBFTMessage, - ViewChangeMessageKind, + ConsensusMessage, ConsensusMessageKind, PBFTMessage, ViewChangeMessageKind, }; use crate::bft::sync::view::ViewInfo; - #[cfg(feature = "serialize_capnp")] pub mod capnp; #[cfg(feature = "serialize_serde")] pub mod serde; -/// The buffer type used to serialize messages into. -pub type Buf = Bytes; - pub fn serialize_consensus(w: &mut W, message: &ConsensusMessage) -> Result<()> where RQ: SerType, diff --git a/febft-pbft-consensus/src/bft/metric/mod.rs b/febft-pbft-consensus/src/bft/metric/mod.rs index c93469d9..8ec69727 100644 --- a/febft-pbft-consensus/src/bft/metric/mod.rs +++ b/febft-pbft-consensus/src/bft/metric/mod.rs @@ -1,6 +1,4 @@ -use atlas_metrics::metrics::{ - metric_duration, metric_store_count, MetricKind, -}; +use atlas_metrics::metrics::{metric_duration, metric_store_count, MetricKind}; use atlas_metrics::{MetricLevel, MetricRegistry}; use std::time::Instant; diff --git a/febft-pbft-consensus/src/bft/mod.rs b/febft-pbft-consensus/src/bft/mod.rs index 11b2ebb7..563185d6 100644 --- a/febft-pbft-consensus/src/bft/mod.rs +++ b/febft-pbft-consensus/src/bft/mod.rs @@ -8,9 +8,10 @@ use std::fmt::Debug; use std::sync::atomic::AtomicBool; use std::sync::Arc; -use ::log::{debug, error, info, trace, warn}; +use tracing::{debug, error, info, trace, warn}; use anyhow::anyhow; use either::Either; +use lazy_static::lazy_static; use crate::bft::config::PBFTConfig; use crate::bft::consensus::{ @@ -53,7 +54,7 @@ use atlas_core::ordering_protocol::{ use atlas_core::reconfiguration_protocol::ReconfigurationProtocol; use atlas_core::request_pre_processing::RequestPreProcessor; use atlas_core::serialize::ReconfigurationProtocolMessage; -use atlas_core::timeouts::{RqTimeout, Timeouts}; +use atlas_core::timeouts::timeout::{ModTimeout, TimeoutModHandle, TimeoutableMod}; pub mod config; pub mod consensus; @@ -69,6 +70,12 @@ pub type PBFT = PBFTConsensus; // The message type for this consensus protocol pub type SysMsg = as OrderingProtocolMessage>::ProtocolMessage; +lazy_static! { + + static ref MOD_NAME: Arc = Arc::from("FEBFT"); + +} + #[derive(Clone, PartialEq, Eq, Debug)] /// Which phase of the consensus algorithm are we currently executing pub enum ConsensusPhase { @@ -106,7 +113,7 @@ pub struct PBFTOrderProtocol /// The request pre processor pre_processor: RequestPreProcessor, // A reference to the timeouts layer - timeouts: Timeouts, + timeouts: TimeoutModHandle, //The proposer guard consensus_guard: Arc, // Check if unordered requests can be proposed. @@ -168,6 +175,58 @@ impl NetworkedOrderProtocolInitializer for PBFTOrderProtocol TimeoutableMod, RQ>> +for PBFTOrderProtocol + where + RQ: SerType + SessionBased + 'static, + NT: OrderProtocolSendNode> + 'static, +{ + fn mod_name() -> Arc { + MOD_NAME.clone() + } + + fn handle_timeout( + &mut self, + timeout: Vec, + ) -> Result, RQ>> { + if self.consensus.is_catching_up() { + warn!( + "{:?} // Ignoring timeouts while catching up", + self.node.id() + ); + + return Ok(OPExecResult::MessageDropped); + } + + let status = self + .synchronizer + .client_requests_timed_out(self.node.id(), &timeout); + + if let SynchronizerStatus::RequestsTimedOut { forwarded, stopped } = status { + if !forwarded.is_empty() { + let requests = self.pre_processor.clone_pending_rqs(forwarded); + + self.synchronizer.forward_requests(requests, &*self.node); + } + + if !stopped.is_empty() { + let stopped = self.pre_processor.clone_pending_rqs(stopped); + + self.switch_phase(ConsensusPhase::SyncPhase); + + self.synchronizer.begin_view_change( + Some(stopped), + &*self.node, + &self.timeouts, + &self.message_log, + ); + } + }; + + Ok(OPExecResult::MessageProcessedNoUpdate) + } +} + impl OrderingProtocol for PBFTOrderProtocol where RQ: SerType + SessionBased + 'static, @@ -247,47 +306,6 @@ impl OrderingProtocol for PBFTOrderProtocol Ok(()) } - - fn handle_timeout( - &mut self, - timeout: Vec, - ) -> Result, RQ>> { - if self.consensus.is_catching_up() { - warn!( - "{:?} // Ignoring timeouts while catching up", - self.node.id() - ); - - return Ok(OPExecResult::MessageDropped); - } - - let status = self - .synchronizer - .client_requests_timed_out(self.node.id(), &timeout); - - if let SynchronizerStatus::RequestsTimedOut { forwarded, stopped } = status { - if !forwarded.is_empty() { - let requests = self.pre_processor.clone_pending_rqs(forwarded); - - self.synchronizer.forward_requests(requests, &*self.node); - } - - if !stopped.is_empty() { - let stopped = self.pre_processor.clone_pending_rqs(stopped); - - self.switch_phase(ConsensusPhase::SyncPhase); - - self.synchronizer.begin_view_change( - Some(stopped), - &*self.node, - &self.timeouts, - &self.message_log, - ); - } - }; - - Ok(OPExecResult::MessageProcessedNoUpdate) - } } impl PermissionedOrderingProtocol for PBFTOrderProtocol diff --git a/febft-pbft-consensus/src/bft/proposer/mod.rs b/febft-pbft-consensus/src/bft/proposer/mod.rs index 5e1f974c..e5c97fe1 100644 --- a/febft-pbft-consensus/src/bft/proposer/mod.rs +++ b/febft-pbft-consensus/src/bft/proposer/mod.rs @@ -4,7 +4,7 @@ use std::sync::{Arc, MutexGuard}; use std::thread::JoinHandle; use std::time::{Duration, Instant}; -use log::{debug, error, info, warn}; +use tracing::{debug, error, info, warn}; use atlas_common::channel::TryRecvError; use atlas_common::node_id::NodeId; @@ -14,7 +14,7 @@ use atlas_communication::message::StoredMessage; use atlas_core::messages::{ClientRqInfo, SessionBased}; use atlas_core::ordering_protocol::networking::OrderProtocolSendNode; use atlas_core::request_pre_processing::{BatchOutput, PreProcessorOutputMessage}; -use atlas_core::timeouts::Timeouts; +use atlas_core::timeouts::timeout::TimeoutModHandle; use atlas_metrics::metrics::{metric_duration, metric_increment, metric_store_count}; use crate::bft::config::ProposerConfig; @@ -46,7 +46,7 @@ where /// Network Node node_ref: Arc, synchronizer: Arc>, - timeouts: Timeouts, + timeouts: TimeoutModHandle, consensus_guard: Arc, // Should we shut down? cancelled: AtomicBool, @@ -92,7 +92,7 @@ where node: Arc, batch_input: BatchOutput, sync: Arc>, - timeouts: Timeouts, + timeouts: TimeoutModHandle, consensus_guard: Arc, proposer_config: ProposerConfig, ) -> Arc { diff --git a/febft-pbft-consensus/src/bft/sync/mod.rs b/febft-pbft-consensus/src/bft/sync/mod.rs index 8fa095be..83a7a365 100755 --- a/febft-pbft-consensus/src/bft/sync/mod.rs +++ b/febft-pbft-consensus/src/bft/sync/mod.rs @@ -12,7 +12,7 @@ use std::{ use either::Either; use getset::Getters; use intmap::IntMap; -use log::{debug, error, info, warn}; +use tracing::{debug, error, info, warn}; #[cfg(feature = "serialize_serde")] use serde::{Deserialize, Serialize}; @@ -33,7 +33,7 @@ use atlas_core::ordering_protocol::reconfigurable_order_protocol::Reconfiguratio use atlas_core::ordering_protocol::{unwrap_shareable_message, ShareableMessage}; use atlas_core::request_pre_processing::RequestPreProcessor; -use atlas_core::timeouts::{RqTimeout, Timeouts}; +use atlas_core::timeouts::timeout::{ModTimeout, TimeoutModHandle}; use crate::bft::consensus::{Consensus, ConsensusStatus}; use crate::bft::log::decisions::{CollectData, Proof, ViewDecisionPair}; @@ -290,7 +290,8 @@ impl TboQueue { /// Verifies if we have new `STOP` messages to be processed for /// the current view. pub fn can_process_stops(&self) -> bool { - self.stop.front() + self.stop + .front() .map(|deque| !deque.is_empty()) .unwrap_or(false) } @@ -298,7 +299,8 @@ impl TboQueue { /// Verifies if we have new `STOP` messages to be processed for /// the current view. pub fn can_process_stop_data(&self) -> bool { - self.stop_data.front() + self.stop_data + .front() .map(|deque| !deque.is_empty()) .unwrap_or(false) } @@ -306,7 +308,8 @@ impl TboQueue { /// Verifies if we have new `STOP` messages to be processed for /// the current view. pub fn can_process_sync(&self) -> bool { - self.sync.front() + self.sync + .front() .map(|deque| !deque.is_empty()) .unwrap_or(false) } @@ -434,8 +437,8 @@ pub enum SyncReconfigurationResult { ///A trait describing some of the necessary methods for the synchronizer pub trait AbstractSynchronizer - where - RQ: SerType, +where + RQ: SerType, { /// Returns information regarding the current view, such as /// the number of faulty replicas the system can tolerate. @@ -486,8 +489,8 @@ pub struct Synchronizer { unsafe impl Sync for Synchronizer {} impl AbstractSynchronizer for Synchronizer - where - RQ: SerType + SessionBased + 'static, +where + RQ: SerType + SessionBased + 'static, { /// Returns some information regarding the current view, such as /// the number of faulty replicas the system can tolerate. @@ -506,7 +509,9 @@ impl AbstractSynchronizer for Synchronizer let current_view = self.view(); if let Some(previous_view) = view.previous_view() { - if current_view.sequence_number() != previous_view.sequence_number() && !self.tbo.lock().unwrap().install_view(previous_view) { + if current_view.sequence_number() != previous_view.sequence_number() + && !self.tbo.lock().unwrap().install_view(previous_view) + { // If we don't install a new view, then we don't want to forget our current state now do we? debug!("Replacing our phase with Init"); @@ -554,8 +559,8 @@ impl AbstractSynchronizer for Synchronizer } impl Synchronizer - where - RQ: SerType + SessionBased + 'static, +where + RQ: SerType + SessionBased + 'static, { pub fn new_follower(node_id: NodeId, view: ViewInfo) -> Arc { Arc::new(Self { @@ -718,14 +723,14 @@ impl Synchronizer pub fn process_message( &self, s_message: ShareableMessage>, - timeouts: &Timeouts, + timeouts: &TimeoutModHandle, log: &mut Log, rq_pre_processor: &RequestPreProcessor, consensus: &mut Consensus, node: &Arc, ) -> SynchronizerStatus - where - NT: OrderProtocolSendNode> + 'static, + where + NT: OrderProtocolSendNode> + 'static, { debug!( "{:?} // Processing view change message {:?} in phase {:?} from {:?}", @@ -792,28 +797,28 @@ impl Synchronizer let i = match message.kind() { ViewChangeMessageKind::Stop(_) | ViewChangeMessageKind::StopQuorumJoin(_) - if msg_seq != next_seq => - { - debug!("{:?} // Received stop message {:?} that does not match up to our local view {:?}", node.id(), message, current_view); + if msg_seq != next_seq => + { + debug!("{:?} // Received stop message {:?} that does not match up to our local view {:?}", node.id(), message, current_view); - let mut guard = self.tbo.lock().unwrap(); + let mut guard = self.tbo.lock().unwrap(); - guard.queue_stop(s_message); + guard.queue_stop(s_message); - return stop_status!(i, ¤t_view); - } + return stop_status!(i, ¤t_view); + } ViewChangeMessageKind::Stop(_) - if self.stopped.borrow().contains_key(header.from().into()) => - { - warn!( + if self.stopped.borrow().contains_key(header.from().into()) => + { + warn!( "{:?} // Received double stop message from node {:?}", node.id(), header.from() ); - // drop attempts to vote twice - return stop_status!(i, ¤t_view); - } + // drop attempts to vote twice + return stop_status!(i, ¤t_view); + } ViewChangeMessageKind::StopQuorumJoin(_) => { warn!("{:?} // Received stop quorum join message while in stopping state. Ignoring", node.id()); @@ -938,16 +943,16 @@ impl Synchronizer let (received, node_id) = match message.kind() { ViewChangeMessageKind::Stop(_) | ViewChangeMessageKind::StopQuorumJoin(_) - if msg_seq != next_seq => - { - debug!("{:?} // Received stop message {:?} that does not match up to our local view {:?}", node.id(), message, current_view); + if msg_seq != next_seq => + { + debug!("{:?} // Received stop message {:?} that does not match up to our local view {:?}", node.id(), message, current_view); - let mut guard = self.tbo.lock().unwrap(); + let mut guard = self.tbo.lock().unwrap(); - guard.queue_stop(s_message); + guard.queue_stop(s_message); - return stop_status!(received, ¤t_view); - } + return stop_status!(received, ¤t_view); + } ViewChangeMessageKind::Stop(_requests) => { let mut guard = self.tbo.lock().unwrap(); @@ -1136,21 +1141,21 @@ impl Synchronizer return SynchronizerStatus::Running; } ViewChangeMessageKind::StopData(_) - if next_view.leader() != node.id() => - { - warn!("{:?} // Received stop data message but we are not the leader of the current view", + if next_view.leader() != node.id() => + { + warn!("{:?} // Received stop data message but we are not the leader of the current view", node.id()); - //If we are not the leader, ignore - return SynchronizerStatus::Running; - } + //If we are not the leader, ignore + return SynchronizerStatus::Running; + } ViewChangeMessageKind::StopData(_) - if collects_guard.contains_key(header.from().into()) => - { - warn!("{:?} // Received stop data message but we have already received one from this node", + if collects_guard.contains_key(header.from().into()) => + { + warn!("{:?} // Received stop data message but we have already received one from this node", node.id()); - // drop attempts to vote twice - return SynchronizerStatus::Running; - } + // drop attempts to vote twice + return SynchronizerStatus::Running; + } ViewChangeMessageKind::StopData(_) => { // The message is related to the view we are awaiting // In order to reach this point, we must be the leader of the current view, @@ -1265,7 +1270,7 @@ impl Synchronizer Some(digest), Some(&*node_sign), ) - .into_inner(); + .into_inner(); (h, message) }; @@ -1376,11 +1381,11 @@ impl Synchronizer return SynchronizerStatus::Running; } ViewChangeMessageKind::Sync(_) - if s_message.header().from() != next_view.leader() => - { - //You're not the leader, what are you saying - return SynchronizerStatus::Running; - } + if s_message.header().from() != next_view.leader() => + { + //You're not the leader, what are you saying + return SynchronizerStatus::Running; + } ViewChangeMessageKind::Sync(_) => { let stored_message = unwrap_shareable_message(s_message); @@ -1452,12 +1457,12 @@ impl Synchronizer pub fn resume_view_change( &self, log: &mut Log, - timeouts: &Timeouts, + timeouts: &TimeoutModHandle, consensus: &mut Consensus, node: &Arc, ) -> Option> - where - NT: OrderProtocolSendNode> + 'static, + where + NT: OrderProtocolSendNode> + 'static, { let state = self.finalize_state.borrow_mut().take()?; @@ -1482,11 +1487,11 @@ impl Synchronizer &self, joining_node: NodeId, node: &NT, - timeouts: &Timeouts, + timeouts: &TimeoutModHandle, log: &Log, ) -> SyncReconfigurationResult - where - NT: OrderProtocolSendNode>, + where + NT: OrderProtocolSendNode>, { let current_view = self.view(); @@ -1544,10 +1549,10 @@ impl Synchronizer pub fn attempt_join_quorum( &self, node: &NT, - _timeouts: &Timeouts, + _timeouts: &TimeoutModHandle, ) -> ReconfigurationAttemptResult - where - NT: OrderProtocolSendNode>, + where + NT: OrderProtocolSendNode>, { let current_view = self.view(); @@ -1595,7 +1600,7 @@ impl Synchronizer &self, join_cert: Option, node: &NT, - timeouts: &Timeouts, + timeouts: &TimeoutModHandle, _log: &Log, ) where NT: OrderProtocolSendNode>, @@ -1657,7 +1662,7 @@ impl Synchronizer &self, timed_out: Option>>, node: &NT, - timeouts: &Timeouts, + timeouts: &TimeoutModHandle, _log: &Log, ) where NT: OrderProtocolSendNode>, @@ -1764,12 +1769,12 @@ impl Synchronizer &self, state: FinalizeState, log: &mut Log, - timeouts: &Timeouts, + timeouts: &TimeoutModHandle, consensus: &mut Consensus, node: &Arc, ) -> SynchronizerStatus - where - NT: OrderProtocolSendNode> + 'static, + where + NT: OrderProtocolSendNode> + 'static, { let FinalizeState { curr_cid, @@ -1809,7 +1814,6 @@ impl Synchronizer // We are missing the last decision, which should be included in the collect data // sent by the leader in the SYNC message - if let Some(last_proof) = last_proof { let quorum_result = consensus .catch_up_to_quorum(&view, last_proof, log) @@ -1853,7 +1857,7 @@ impl Synchronizer &self, header: &Header, pre_prepare: &ConsensusMessage, - timeouts: &Timeouts, + timeouts: &TimeoutModHandle, ) -> Vec { match &self.accessory { SynchronizerAccessory::Replica(rep) => { @@ -1865,7 +1869,7 @@ impl Synchronizer /// Watch requests that have been received from other replicas /// - pub fn watch_received_requests(&self, digest: Vec, timeouts: &Timeouts) { + pub fn watch_received_requests(&self, digest: Vec, timeouts: &TimeoutModHandle) { match &self.accessory { SynchronizerAccessory::Replica(rep) => { rep.watch_received_requests(digest, timeouts); @@ -1878,8 +1882,8 @@ impl Synchronizer /// So that everyone knows about (including a leader that could still be correct, but /// Has not received the requests from the client) pub fn forward_requests(&self, timed_out: Vec>, node: &NT) - where - NT: OrderProtocolSendNode>, + where + NT: OrderProtocolSendNode>, { match &self.accessory { SynchronizerAccessory::Follower(_) => {} @@ -1894,7 +1898,7 @@ impl Synchronizer pub fn client_requests_timed_out( &self, my_id: NodeId, - seq: &Vec, + seq: &Vec, ) -> SynchronizerStatus { match &self.accessory { SynchronizerAccessory::Follower(_) => SynchronizerStatus::Nil, @@ -1910,7 +1914,7 @@ impl Synchronizer fn normalized_collects( collects: &IntMap>>, in_exec: SeqNo, - ) -> impl Iterator>> { + ) -> impl Iterator>> { let values = collects.values(); let collects = normalized_collects(in_exec, collect_data(values)); @@ -1925,8 +1929,8 @@ impl Synchronizer view: &ViewInfo, node: &NT, ) -> Option<&'a Proof> - where - NT: OrderProtocolSendNode>, + where + NT: OrderProtocolSendNode>, { highest_proof::(view, node, guard.values()) } @@ -2152,8 +2156,8 @@ fn certified_value( } fn collect_data<'a, O: 'a>( - collects: impl Iterator>>, -) -> impl Iterator> { + collects: impl Iterator>>, +) -> impl Iterator> { collects.filter_map(|stored| match stored.message().view_change().kind() { ViewChangeMessageKind::StopData(collects) => Some(collects), _ => None, @@ -2162,8 +2166,8 @@ fn collect_data<'a, O: 'a>( fn normalized_collects<'a, O: 'a>( in_exec: SeqNo, - collects: impl Iterator>, -) -> impl Iterator>> { + collects: impl Iterator>, +) -> impl Iterator>> { collects.map(move |collect| { if collect.incomplete_proof().executing() == in_exec { Some(collect) @@ -2177,9 +2181,9 @@ fn signed_collects( node: &NT, collects: Vec>>, ) -> Vec>> - where - RQ: SerType, - NT: OrderProtocolSendNode>, +where + RQ: SerType, + NT: OrderProtocolSendNode>, { collects .into_iter() @@ -2188,9 +2192,9 @@ fn signed_collects( } fn validate_signature<'a, RQ, M, NT>(node: &'a NT, stored: &'a StoredMessage) -> bool - where - RQ: SerType, - NT: OrderProtocolSendNode>, +where + RQ: SerType, + NT: OrderProtocolSendNode>, { //TODO: Fix this as I believe it will always be false let wm = match WireMessage::from_header(*stored.header(), MessageModule::Protocol) { @@ -2224,10 +2228,10 @@ fn validate_signature<'a, RQ, M, NT>(node: &'a NT, stored: &'a StoredMessage) } fn highest_proof<'a, RQ, I, NT>(view: &ViewInfo, node: &NT, collects: I) -> Option<&'a Proof> - where - RQ: SerType, - I: Iterator>>, - NT: OrderProtocolSendNode>, +where + RQ: SerType, + I: Iterator>>, + NT: OrderProtocolSendNode>, { collect_data(collects) // fetch proofs diff --git a/febft-pbft-consensus/src/bft/sync/replica_sync/mod.rs b/febft-pbft-consensus/src/bft/sync/replica_sync/mod.rs index 6acb2f7e..f4f3c13f 100644 --- a/febft-pbft-consensus/src/bft/sync/replica_sync/mod.rs +++ b/febft-pbft-consensus/src/bft/sync/replica_sync/mod.rs @@ -7,7 +7,7 @@ use std::cell::Cell; use std::marker::PhantomData; use std::time::{Duration, Instant}; -use log::{debug, error, info}; +use tracing::{debug, error, info}; use atlas_common::collections; use atlas_common::node_id::NodeId; @@ -17,7 +17,8 @@ use atlas_communication::message::{Header, StoredMessage}; use atlas_core::messages::{ClientRqInfo, ForwardedRequestsMessage, SessionBased}; use atlas_core::ordering_protocol::networking::OrderProtocolSendNode; use atlas_core::request_pre_processing::{PreProcessorMessage, RequestPreProcessor}; -use atlas_core::timeouts::{RqTimeout, TimeoutKind, TimeoutPhase, Timeouts}; +use atlas_core::timeouts::timeout::{ModTimeout, TimeoutModHandle}; +use atlas_core::timeouts::{TimeOutable, TimeoutID}; use atlas_metrics::metrics::{metric_duration, metric_increment}; use crate::bft::consensus::Consensus; @@ -66,7 +67,7 @@ impl ReplicaSynchronizer { consensus: &Consensus, log: &Log, pre_processor: &RequestPreProcessor, - timeouts: &Timeouts, + timeouts: &TimeoutModHandle, node: &NT, ) where NT: OrderProtocolSendNode>, @@ -114,7 +115,7 @@ impl ReplicaSynchronizer { pub(super) fn handle_begin_view_change( &self, base_sync: &Synchronizer, - timeouts: &Timeouts, + timeouts: &TimeoutModHandle, node: &NT, timed_out: Option>>, ) where @@ -151,7 +152,7 @@ impl ReplicaSynchronizer { pub(super) fn handle_begin_quorum_view_change( &self, base_sync: &Synchronizer, - _timeouts: &Timeouts, + _timeouts: &TimeoutModHandle, node: &NT, join_cert: NodeId, ) where @@ -175,10 +176,19 @@ impl ReplicaSynchronizer { } /// Watch a vector of requests received - pub fn watch_received_requests(&self, requests: Vec, timeouts: &Timeouts) { + pub fn watch_received_requests( + &self, + requests: Vec, + timeouts: &TimeoutModHandle, + ) { let start_time = Instant::now(); - timeouts.timeout_client_requests(self.timeout_dur.get(), requests); + let _ = timeouts.request_timeouts( + transform_client_rq_to_timeouts(requests), + self.timeout_dur.get(), + 1, + true, + ); metric_duration(SYNC_WATCH_REQUESTS_ID, start_time.elapsed()); } @@ -190,7 +200,7 @@ impl ReplicaSynchronizer { &self, header: &Header, pre_prepare: &ConsensusMessage, - timeouts: &Timeouts, + timeouts: &TimeoutModHandle, ) -> Vec { let start_time = Instant::now(); @@ -224,7 +234,8 @@ impl ReplicaSynchronizer { //Notify the timeouts that we have received the following requests //TODO: Should this only be done after the commit phase? - timeouts.received_pre_prepare(sending_node, timeout_info); + + let _ = timeouts.acks_received(transform_client_rq_to_timeouts_ack(timeout_info, sending_node)); metric_duration(SYNC_BATCH_RECEIVED_ID, start_time.elapsed()); @@ -236,7 +247,7 @@ impl ReplicaSynchronizer { &self, base_sync: &Synchronizer, pre_processor: &RequestPreProcessor, - timeouts: &Timeouts, + timeouts: &TimeoutModHandle, ) { // TODO: maybe optimize this `stopped_requests` call, to avoid // a heap allocation of a `Vec`? @@ -245,7 +256,7 @@ impl ReplicaSynchronizer { let requests = self.drain_stopped_request(base_sync); - let rq_info = requests.iter().map(ClientRqInfo::from).collect(); + let rq_info = requests.iter().map(ClientRqInfo::from).collect::>(); let count = requests.len(); @@ -254,7 +265,12 @@ impl ReplicaSynchronizer { .send_return(PreProcessorMessage::StoppedRequests(requests)) .unwrap(); - timeouts.timeout_client_requests(self.timeout_dur.get(), rq_info); + let _ = timeouts.request_timeouts( + transform_client_rq_to_timeouts(rq_info), + self.timeout_dur.get(), + 1, + true, + ); debug!("Registering {} stopped requests", count); @@ -263,15 +279,15 @@ impl ReplicaSynchronizer { } /// Stop watching all pending client requests. - pub fn unwatch_all_requests(&self, timeouts: &Timeouts) { - timeouts.cancel_client_rq_timeouts(None); + pub fn unwatch_all_requests(&self, timeouts: &TimeoutModHandle) { + let _ = timeouts.cancel_all_timeouts(); } /// Restart watching all pending client requests. /// This happens when a new leader has been elected and /// We must now give him some time to propose all of the requests - pub fn watch_all_requests(&self, timeouts: &Timeouts) { - timeouts.reset_all_client_rq_timeouts(self.timeout_dur.get()); + pub fn watch_all_requests(&self, timeouts: &TimeoutModHandle) { + let _ = timeouts.reset_all_timeouts(); } /// Handle a timeout received from the timeouts layer. @@ -281,7 +297,7 @@ impl ReplicaSynchronizer { &self, base_sync: &Synchronizer, my_id: NodeId, - timed_out_rqs: &Vec, + timed_out_rqs: &Vec, ) -> SynchronizerStatus { //// iterate over list of watched pending requests, //// and select the ones to be stopped or forwarded @@ -304,24 +320,28 @@ impl ReplicaSynchronizer { ); for timed_out_rq in timed_out_rqs { - match timed_out_rq.timeout_phase() { - TimeoutPhase::TimedOut(id, _time) => { - let timeout = timed_out_rq.timeout_kind(); - - let rq_info = match timeout { - TimeoutKind::ClientRequestTimeout(rq) => rq, - _ => unreachable!( - "Only client requests should be timed out at the synchronizer" - ), - }; - - if *id == 0 { - forwarded.push(rq_info.clone()); - } else if *id >= 1 { - // The second timeout generates a stopped request - stopped.push(rq_info.clone()); - } + if timed_out_rq.extra_info().is_none() { + continue; + } + + let cli_rq = timed_out_rq + .extra_info() + .unwrap() + .as_any() + .downcast_ref::(); + + if cli_rq.is_none() { + continue; + } + + match timed_out_rq.timeout_count() { + 1 => { + forwarded.push(cli_rq.cloned().unwrap()); + } + 2.. => { + stopped.push(cli_rq.cloned().unwrap()); } + _ => {} } } @@ -457,10 +477,47 @@ impl ReplicaSynchronizer { } } -///Justification/Sort of correction proof: +/// # Safety ///In general, all fields and methods will be accessed by the replica thread, never by the client rq thread. /// Therefore, we only have to protect the fields that will be accessed by both clients and replicas. /// So we protect collects, watching and tbo as those are the fields that are going to be /// accessed by both those threads. /// Since the other fields are going to be accessed by just 1 thread, we just need them to be Send, which they are unsafe impl Sync for ReplicaSynchronizer {} + +fn transform_client_rq_to_timeouts( + client_rq: impl IntoIterator, +) -> Vec<(TimeoutID, Option>)> { + client_rq + .into_iter() + .map(|rq| { + ( + TimeoutID::SessionBased { + session: rq.session(), + seq_no: rq.sequence_number(), + from: rq.sender(), + }, + Some(Box::new(rq) as Box), + ) + }) + .collect() +} + +fn transform_client_rq_to_timeouts_ack( + client_rq: impl IntoIterator, + from: NodeId, +) -> Vec<(TimeoutID, NodeId)> { + client_rq + .into_iter() + .map(|rq| { + ( + TimeoutID::SessionBased { + session: rq.session(), + seq_no: rq.sequence_number(), + from: rq.sender(), + }, + from, + ) + }) + .collect() +} \ No newline at end of file diff --git a/febft-pbft-consensus/src/bft/sync/view/mod.rs b/febft-pbft-consensus/src/bft/sync/view/mod.rs index 3119c61b..aaa1b895 100644 --- a/febft-pbft-consensus/src/bft/sync/view/mod.rs +++ b/febft-pbft-consensus/src/bft/sync/view/mod.rs @@ -13,10 +13,10 @@ use atlas_common::Err; use atlas_core::ordering_protocol::networking::serialize::NetworkView; use num_bigint::BigUint; use num_bigint::ToBigUint; -use num_traits::identities::Zero; #[cfg(feature = "serialize_serde")] use serde::{Deserialize, Serialize}; use std::ops::{Add, Div}; +use num_traits::Zero; use thiserror::Error; /// This struct contains information related with an @@ -65,7 +65,6 @@ impl NetworkView for ViewInfo { } } - const LEADER_COUNT: usize = 1; impl ViewInfo { @@ -81,7 +80,7 @@ impl ViewInfo { let destined_leader = quorum_members[(usize::from(seq)) % n]; let mut leader_set = vec![destined_leader]; - + for i in 1..LEADER_COUNT { leader_set.push(quorum_members[(usize::from(seq) + i) % n]); } diff --git a/febft-pbft-consensus/src/lib.rs b/febft-pbft-consensus/src/lib.rs index e96571a1..6bdd7a5d 100644 --- a/febft-pbft-consensus/src/lib.rs +++ b/febft-pbft-consensus/src/lib.rs @@ -1,6 +1,5 @@ //! This crate, `febft`, implements a byzantine fault tolerant state machine -//! replication library in Rust, whilst taking advantage of the feature flags -//! in `Cargo.toml` to provide a super flexible, modular API. +//! replication library in Rust. //! //! # Feature flags //! diff --git a/febft-state-transfer/Cargo.toml b/febft-state-transfer/Cargo.toml index 5cfafea7..8acd8def 100644 --- a/febft-state-transfer/Cargo.toml +++ b/febft-state-transfer/Cargo.toml @@ -18,7 +18,8 @@ anyhow = "1.0" thiserror = "1.0" serde = { version = "*", optional = true } capnp = { version = "0.16.1", optional = true } -log = "0.4.17" +lazy_static = "*" +tracing = "*" atlas-common = { path = "../../Atlas/Atlas-Common" } atlas-communication = { path = "../../Atlas/Atlas-Communication" } diff --git a/febft-state-transfer/src/lib.rs b/febft-state-transfer/src/lib.rs index 40cb0b8f..d24e1b6e 100644 --- a/febft-state-transfer/src/lib.rs +++ b/febft-state-transfer/src/lib.rs @@ -1,16 +1,15 @@ - use std::cmp::Ordering; use std::fmt::{Debug, Formatter}; use std::sync::Arc; use std::time::{Duration, Instant}; use anyhow::anyhow; -use log::{debug, error, info, warn}; +use lazy_static::lazy_static; +use tracing::{debug, error, info, warn}; #[cfg(feature = "serialize_serde")] use serde::{Deserialize, Serialize}; use thiserror::Error; -use atlas_common::{collections, Err}; use atlas_common::channel::ChannelSyncTx; use atlas_common::collections::HashMap; use atlas_common::crypto::hash::Digest; @@ -18,31 +17,37 @@ use atlas_common::error::*; use atlas_common::globals::ReadOnly; use atlas_common::node_id::NodeId; use atlas_common::ordering::{Orderable, SeqNo}; +use atlas_common::{collections, Err}; use atlas_communication::message::{Header, StoredMessage}; -use atlas_core::ordering_protocol::{ExecutionResult, OrderingProtocol}; use atlas_core::ordering_protocol::networking::serialize::NetworkView; +use atlas_core::ordering_protocol::{ExecutionResult, OrderingProtocol}; use atlas_core::persistent_log::{OperationMode, PersistableStateTransferProtocol}; -use atlas_core::timeouts::{RqTimeout, TimeoutKind, Timeouts}; +use atlas_core::timeouts::timeout::{ModTimeout, TimeoutModHandle, TimeoutableMod}; +use atlas_core::timeouts::{TimeoutID, TimeoutsHandle}; use atlas_metrics::metrics::metric_duration; use atlas_smr_application::state::monolithic_state::{InstallStateMessage, MonolithicState}; use atlas_smr_core::persistent_log::MonolithicStateLog; -use atlas_smr_core::state_transfer::{ - Checkpoint, CstM, StateTransferProtocol, STPollResult, STResult, STTimeoutResult, -}; use atlas_smr_core::state_transfer::monolithic_state::{ MonolithicStateTransfer, MonolithicStateTransferInitializer, }; use atlas_smr_core::state_transfer::networking::StateTransferSendNode; +use atlas_smr_core::state_transfer::{ + Checkpoint, CstM, STPollResult, STResult, STTimeoutResult, StateTransferProtocol, +}; use crate::config::StateTransferConfig; -use crate::message::{CstMessage, CstMessageKind}; use crate::message::serialize::CSTMsg; +use crate::message::{CstMessage, CstMessageKind}; use crate::metrics::STATE_TRANSFER_STATE_INSTALL_CLONE_TIME_ID; pub mod config; pub mod message; pub mod metrics; +lazy_static!( + static ref MOD_NAME: Arc = Arc::from("ST_TRANSFER"); +); + /// The state of the checkpoint pub enum CheckpointState { // no checkpoint has been performed yet @@ -138,7 +143,7 @@ where current_checkpoint_state: CheckpointState, base_timeout: Duration, curr_timeout: Duration, - timeouts: Timeouts, + timeouts: TimeoutModHandle, // NOTE: remembers whose replies we have // received already, to avoid replays //voted: HashSet, @@ -228,6 +233,29 @@ macro_rules! getmessage { }}; } +impl TimeoutableMod for CollabStateTransfer +where + S: MonolithicState + 'static, + PL: MonolithicStateLog + 'static, + NT: StateTransferSendNode> + 'static, +{ + fn mod_name() -> Arc { + MOD_NAME.clone() + } + + fn handle_timeout(&mut self, timeout: Vec) -> Result { + for cst_seq in timeout { + if let TimeoutID::SeqNoBased(seq_no) = cst_seq.id() { + /*if self.cst_request_timed_out(*seq_no, ) { + return Ok(STTimeoutResult::RunCst); + }*/ + } + } + + Ok(STTimeoutResult::CstNotNeeded) + } +} + impl StateTransferProtocol for CollabStateTransfer where S: MonolithicState + 'static, @@ -330,8 +358,10 @@ where } // Notify timeouts that we have received this message - self.timeouts - .received_cst_request(header.from(), message.sequence_number()); + self.timeouts.ack_received( + TimeoutID::SeqNoBased(message.sequence_number()), + header.from(), + )?; let status = self.process_message(view.clone(), CstProgress::Message(header, message)); @@ -405,21 +435,6 @@ where Ok(ExecutionResult::BeginCheckpoint) } - - fn handle_timeout(&mut self, view: V, timeout: Vec) -> Result - where - V: NetworkView, - { - for cst_seq in timeout { - if let TimeoutKind::Cst(cst_seq) = cst_seq.timeout_kind() { - if self.cst_request_timed_out(*cst_seq, view.clone()) { - return Ok(STTimeoutResult::RunCst); - } - } - } - - Ok(STTimeoutResult::CstNotNeeded) - } } impl MonolithicStateTransfer for CollabStateTransfer @@ -452,7 +467,7 @@ where { fn initialize( config: Self::Config, - timeouts: Timeouts, + timeouts: TimeoutModHandle, node: Arc, log: PL, executor_handle: ChannelSyncTx>, @@ -487,7 +502,7 @@ where pub fn new( node: Arc, base_timeout: Duration, - timeouts: Timeouts, + timeouts: TimeoutModHandle, persistent_log: PL, install_channel: ChannelSyncTx>, ) -> Self { @@ -548,7 +563,9 @@ where for request in reqs { // We only want to reply to the most recent requests from each of the nodes - if let std::collections::hash_map::Entry::Vacant(e) = map.entry(request.header().from()) { + if let std::collections::hash_map::Entry::Vacant(e) = + map.entry(request.header().from()) + { e.insert(request); } else { map.entry(request.header().from()).and_modify(|x| { @@ -800,7 +817,9 @@ where self.received_states.contains_key(&state_digest) ); - if let std::collections::hash_map::Entry::Vacant(e) = self.received_states.entry(state_digest) { + if let std::collections::hash_map::Entry::Vacant(e) = + self.received_states.entry(state_digest) + { e.insert(ReceivedState { count: 1, state }); } else { let current_state = self.received_states.get_mut(&state_digest).unwrap(); @@ -1012,8 +1031,13 @@ where cst_seq ); - self.timeouts - .timeout_cst_request(self.curr_timeout, view.quorum() as u32, cst_seq); + let _ = self.timeouts.request_timeout( + TimeoutID::SeqNoBased(cst_seq), + None, + self.curr_timeout, + view.quorum(), + false, + ); self.phase = ProtoPhase::ReceivingCid(0); @@ -1046,8 +1070,13 @@ where cst_seq ); - self.timeouts - .timeout_cst_request(self.curr_timeout, view.quorum() as u32, cst_seq); + let _ = self.timeouts.request_timeout( + TimeoutID::SeqNoBased(cst_seq), + None, + self.curr_timeout, + view.quorum(), + false, + ); self.phase = ProtoPhase::ReceivingState(0);