Skip to content

Commit

Permalink
Merge branch 'nuno1212s:master' into master
Browse files Browse the repository at this point in the history
  • Loading branch information
nuno1212s authored Nov 20, 2023
2 parents 45d0d59 + 9b9c99f commit f23f60f
Show file tree
Hide file tree
Showing 30 changed files with 3,047 additions and 2,769 deletions.
14 changes: 7 additions & 7 deletions febft-pbft-consensus/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ default = [
]

serialize_serde = ["atlas-capnp", "serde", "serde_bytes", "bincode", "atlas-common/serialize_serde",
"atlas-execution/serialize_serde", "atlas-communication/serialize_serde", "atlas-core/serialize_serde"]
"atlas-smr-application/serialize_serde", "atlas-communication/serialize_serde", "atlas-core/serialize_serde"]
serialize_capnp = ["atlas-capnp"]

[dev-dependencies]
Expand All @@ -38,12 +38,12 @@ mimalloc = { version = "*", default-features = false }
rand = {version = "0.8.5", features = ["small_rng"] }

[dependencies]
atlas-common = { path = "../../Atlas/atlas-common" }
atlas-communication = { path = "../../Atlas/atlas-communication" }
atlas-execution = { path = "../../Atlas/atlas-execution" }
atlas-core = { path = "../../Atlas/atlas-core" }
atlas-capnp = { path = "../../Atlas/atlas-capnp", optional = true }
atlas-metrics = {path = "../../Atlas/atlas-metrics"}
atlas-common = { path = "../../Atlas/Atlas-Common" }
atlas-communication = { path = "../../Atlas/Atlas-Communication" }
atlas-smr-application = { path = "../../Atlas/Atlas-SMR-Application" }
atlas-core = { path = "../../Atlas/Atlas-Core" }
atlas-capnp = { path = "../../Atlas/Atlas-capnp", optional = true }
atlas-metrics = {path = "../../Atlas/Atlas-Metrics"}

capnp = { version = "0.16" }
fastrand = "1.7.0"
Expand Down
25 changes: 8 additions & 17 deletions febft-pbft-consensus/src/bft/config/mod.rs
Original file line number Diff line number Diff line change
@@ -1,34 +1,26 @@
use std::marker::PhantomData;
use std::sync::Arc;
use std::time::Duration;
use atlas_common::globals::ReadOnly;

use atlas_common::node_id::NodeId;
use atlas_communication::Node;
use atlas_execution::ExecutorHandle;
use atlas_execution::serialize::SharedData;
use atlas_core::followers::FollowerHandle;
use atlas_core::serialize::{OrderingProtocolMessage, StateTransferMessage, ServiceMsg};
use atlas_core::state_transfer::Checkpoint;
use atlas_core::timeouts::Timeouts;
use atlas_smr_application::serialize::ApplicationData;

use crate::bft::message::serialize::PBFTConsensus;
use crate::bft::observer::ObserverHandle;
use crate::bft::sync::view::ViewInfo;

pub struct PBFTConfig<D: SharedData, ST> {
pub struct PBFTConfig<D>
where D: ApplicationData + 'static {
pub node_id: NodeId,
// pub observer_handle: ObserverHandle,
pub follower_handle: Option<FollowerHandle<PBFTConsensus<D>>>,
pub follower_handle: Option<FollowerHandle<D, PBFTConsensus<D>, PBFTConsensus<D>>>,
pub view: ViewInfo,
pub timeout_dur: Duration,
pub proposer_config: ProposerConfig,
pub watermark: u32,
pub _phantom_data: PhantomData<ST>,
}

impl<D: SharedData + 'static,
ST: StateTransferMessage + 'static> PBFTConfig<D, ST> {
impl<D: ApplicationData + 'static, > PBFTConfig<D> {
pub fn new(node_id: NodeId,
follower_handle: Option<FollowerHandle<PBFTConsensus<D>>>,
follower_handle: Option<FollowerHandle<D, PBFTConsensus<D>, PBFTConsensus<D>>>,
view: ViewInfo, timeout_dur: Duration,
watermark: u32, proposer_config: ProposerConfig) -> Self {
Self {
Expand All @@ -39,7 +31,6 @@ impl<D: SharedData + 'static,
timeout_dur,
proposer_config,
watermark,
_phantom_data: Default::default(),
}
}
}
Expand Down
115 changes: 57 additions & 58 deletions febft-pbft-consensus/src/bft/consensus/accessory/mod.rs
Original file line number Diff line number Diff line change
@@ -1,136 +1,135 @@
use atlas_common::crypto::hash::Digest;
use atlas_common::ordering::SeqNo;
use atlas_communication::Node;
use atlas_execution::serialize::SharedData;
use atlas_core::serialize::StateTransferMessage;
use std::sync::Arc;
use atlas_communication::message::Header;

use atlas_communication::protocol_node::ProtocolNetworkNode;
use atlas_core::ordering_protocol::networking::OrderProtocolSendNode;
use atlas_smr_application::serialize::ApplicationData;

use crate::bft::consensus::accessory::replica::ReplicaAccessory;
use crate::bft::log::deciding::WorkingDecisionLog;
use crate::bft::message::ConsensusMessage;
use crate::bft::msg_log::deciding_log::DecidingLog;
use crate::bft::msg_log::decisions::StoredConsensusMessage;
use crate::bft::PBFT;
use crate::bft::message::serialize::PBFTConsensus;
use crate::bft::sync::view::ViewInfo;

pub mod replica;

pub enum ConsensusDecisionAccessory<D: SharedData + 'static, ST: StateTransferMessage + 'static> {
pub enum ConsensusDecisionAccessory<D>
where D: ApplicationData + 'static, {
Follower,
Replica(ReplicaAccessory<D, ST>),
Replica(ReplicaAccessory<D>),
}

pub trait AccessoryConsensus<D, ST> where D: SharedData + 'static,
ST: StateTransferMessage + 'static {

pub trait AccessoryConsensus<D> where D: ApplicationData + 'static, {
/// Handle the reception of a pre-prepare message without having completed the pre prepare phase
fn handle_partial_pre_prepare<NT>(&mut self, deciding_log: &DecidingLog<D::Request>,
fn handle_partial_pre_prepare<NT>(&mut self, deciding_log: &WorkingDecisionLog<D::Request>,
view: &ViewInfo,
msg: StoredConsensusMessage<D::Request>,
node: &NT) where NT: Node<PBFT<D, ST>>;
header: &Header, msg: &ConsensusMessage<D::Request>,
node: &NT) where NT: OrderProtocolSendNode<D, PBFTConsensus<D>> + 'static;

/// Handle the prepare phase having been completed
fn handle_pre_prepare_phase_completed<NT>(&mut self, deciding_log: &DecidingLog<D::Request>,
fn handle_pre_prepare_phase_completed<NT>(&mut self, deciding_log: &WorkingDecisionLog<D::Request>,
view: &ViewInfo,
msg: StoredConsensusMessage<D::Request>,
node: &NT) where NT: Node<PBFT<D, ST>>;
header: &Header, msg: &ConsensusMessage<D::Request>,
node: &Arc<NT>) where NT: OrderProtocolSendNode<D, PBFTConsensus<D>> + 'static;

/// Handle a prepare message processed during the preparing phase without having
/// reached a quorum
fn handle_preparing_no_quorum<NT>(&mut self, deciding_log: &DecidingLog<D::Request>,
fn handle_preparing_no_quorum<NT>(&mut self, deciding_log: &WorkingDecisionLog<D::Request>,
view: &ViewInfo,
msg: StoredConsensusMessage<D::Request>,
node: &NT) where NT: Node<PBFT<D, ST>>;
header: &Header, msg: &ConsensusMessage<D::Request>,
node: &NT) where NT: OrderProtocolSendNode<D, PBFTConsensus<D>> + 'static;

/// Handle a prepare message processed during the prepare phase when a quorum
/// has been achieved
fn handle_preparing_quorum<NT>(&mut self, deciding_log: &DecidingLog<D::Request>,
fn handle_preparing_quorum<NT>(&mut self, deciding_log: &WorkingDecisionLog<D::Request>,
view: &ViewInfo,
msg: StoredConsensusMessage<D::Request>,
node: &NT) where NT: Node<PBFT<D, ST>>;
header: &Header, msg: &ConsensusMessage<D::Request>,
node: &NT) where NT: OrderProtocolSendNode<D, PBFTConsensus<D>> + 'static;

/// Handle a commit message processed during the preparing phase without having
/// reached a quorum
fn handle_committing_no_quorum<NT>(&mut self, deciding_log: &DecidingLog<D::Request>,
fn handle_committing_no_quorum<NT>(&mut self, deciding_log: &WorkingDecisionLog<D::Request>,
view: &ViewInfo,
msg: StoredConsensusMessage<D::Request>,
node: &NT) where NT: Node<PBFT<D, ST>>;
header: &Header, msg: &ConsensusMessage<D::Request>,
node: &NT) where NT: OrderProtocolSendNode<D, PBFTConsensus<D>> + 'static;

/// Handle a commit message processed during the prepare phase when a quorum has been reached
fn handle_committing_quorum<NT>(&mut self, deciding_log: &DecidingLog<D::Request>,
fn handle_committing_quorum<NT>(&mut self, deciding_log: &WorkingDecisionLog<D::Request>,
view: &ViewInfo,
msg: StoredConsensusMessage<D::Request>,
node: &NT) where NT: Node<PBFT<D, ST>>;
header: &Header, msg: &ConsensusMessage<D::Request>,
node: &NT) where NT: OrderProtocolSendNode<D, PBFTConsensus<D>> + 'static;
}

impl<D, ST> AccessoryConsensus<D, ST> for ConsensusDecisionAccessory<D, ST>
where D: SharedData + 'static, ST: StateTransferMessage + 'static {

fn handle_partial_pre_prepare<NT>(&mut self, deciding_log: &DecidingLog<D::Request>,
impl<D> AccessoryConsensus<D> for ConsensusDecisionAccessory<D>
where D: ApplicationData + 'static {
fn handle_partial_pre_prepare<NT>(&mut self, deciding_log: &WorkingDecisionLog<D::Request>,
view: &ViewInfo,
msg: StoredConsensusMessage<D::Request>,
node: &NT) where NT: Node<PBFT<D, ST>> {
header: &Header, msg: &ConsensusMessage<D::Request>,
node: &NT) where NT: OrderProtocolSendNode<D, PBFTConsensus<D>> + 'static {
match self {
ConsensusDecisionAccessory::Follower => {}
ConsensusDecisionAccessory::Replica(rep) => {
rep.handle_partial_pre_prepare(deciding_log, view, msg, node);
rep.handle_partial_pre_prepare(deciding_log, view, header, msg, node);
}
}
}

fn handle_pre_prepare_phase_completed<NT>(&mut self, deciding_log: &DecidingLog<D::Request>,
fn handle_pre_prepare_phase_completed<NT>(&mut self, deciding_log: &WorkingDecisionLog<D::Request>,
view: &ViewInfo,
msg: StoredConsensusMessage<D::Request>,
node: &NT) where NT: Node<PBFT<D, ST>> {
header: &Header, msg: &ConsensusMessage<D::Request>,
node: &Arc<NT>) where NT: OrderProtocolSendNode<D, PBFTConsensus<D>> + 'static {
match self {
ConsensusDecisionAccessory::Follower => {}
ConsensusDecisionAccessory::Replica(rep) => {
rep.handle_pre_prepare_phase_completed(deciding_log, view, msg, node);
rep.handle_pre_prepare_phase_completed(deciding_log, view, header, msg, node);
}
}
}

fn handle_preparing_no_quorum<NT>(&mut self, deciding_log: &DecidingLog<D::Request>,
fn handle_preparing_no_quorum<NT>(&mut self, deciding_log: &WorkingDecisionLog<D::Request>,
view: &ViewInfo,
msg: StoredConsensusMessage<D::Request>,
node: &NT) where NT: Node<PBFT<D, ST>> {
header: &Header, msg: &ConsensusMessage<D::Request>,
node: &NT) where NT: OrderProtocolSendNode<D, PBFTConsensus<D>> + 'static {
match self {
ConsensusDecisionAccessory::Follower => {}
ConsensusDecisionAccessory::Replica(rep) => {
rep.handle_preparing_no_quorum(deciding_log, view, msg, node);
rep.handle_preparing_no_quorum(deciding_log, view, header, msg, node);
}
}
}

fn handle_preparing_quorum<NT>(&mut self, deciding_log: &DecidingLog<D::Request>,
fn handle_preparing_quorum<NT>(&mut self, deciding_log: &WorkingDecisionLog<D::Request>,
view: &ViewInfo,
msg: StoredConsensusMessage<D::Request>,
node: &NT) where NT: Node<PBFT<D, ST>> {
header: &Header, msg: &ConsensusMessage<D::Request>,
node: &NT) where NT: OrderProtocolSendNode<D, PBFTConsensus<D>> + 'static {
match self {
ConsensusDecisionAccessory::Follower => {}
ConsensusDecisionAccessory::Replica(rep) => {
rep.handle_preparing_quorum(deciding_log, view, msg, node);
rep.handle_preparing_quorum(deciding_log, view, header, msg, node);
}
}
}

fn handle_committing_no_quorum<NT>(&mut self, deciding_log: &DecidingLog<D::Request>,
fn handle_committing_no_quorum<NT>(&mut self, deciding_log: &WorkingDecisionLog<D::Request>,
view: &ViewInfo,
msg: StoredConsensusMessage<D::Request>,
node: &NT) where NT: Node<PBFT<D, ST>> {
header: &Header, msg: &ConsensusMessage<D::Request>,
node: &NT) where NT: OrderProtocolSendNode<D, PBFTConsensus<D>> + 'static {
match self {
ConsensusDecisionAccessory::Follower => {}
ConsensusDecisionAccessory::Replica(rep) => {
rep.handle_committing_no_quorum(deciding_log, view, msg, node);
rep.handle_committing_no_quorum(deciding_log, view, header, msg, node);
}
}
}

fn handle_committing_quorum<NT>(&mut self, deciding_log: &DecidingLog<D::Request>,
fn handle_committing_quorum<NT>(&mut self, deciding_log: &WorkingDecisionLog<D::Request>,
view: &ViewInfo,
msg: StoredConsensusMessage<D::Request>,
node: &NT) where NT: Node<PBFT<D, ST>> {
header: &Header, msg: &ConsensusMessage<D::Request>,
node: &NT) where NT: OrderProtocolSendNode<D, PBFTConsensus<D>> + 'static {
match self {
ConsensusDecisionAccessory::Follower => {}
ConsensusDecisionAccessory::Replica(rep) => {
rep.handle_committing_quorum(deciding_log, view, msg, node);
rep.handle_committing_quorum(deciding_log, view, header, msg, node);
}
}
}
Expand Down
Loading

0 comments on commit f23f60f

Please sign in to comment.