Skip to content

Commit

Permalink
Merge branch 'nuno1212s:master' into master
Browse files Browse the repository at this point in the history
  • Loading branch information
nuno1212s authored Apr 3, 2024
2 parents 717b176 + 1094045 commit f8de08a
Show file tree
Hide file tree
Showing 17 changed files with 407 additions and 323 deletions.
59 changes: 12 additions & 47 deletions febft-pbft-consensus/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -23,69 +23,34 @@ serialize_capnp = ["atlas-capnp"]
bincode = "2.0.0-rc.3"
num_cpus = "1"
mimalloc = { version = "*", default-features = false }
rand = {version = "0.8.5", features = ["small_rng"] }
rand = { version = "0.8.5", features = ["small_rng"] }
rand_core = "0"

[dependencies]
anyhow = "1.0"
thiserror = "1.0"
chrono = "0"
getset = "0.1.2"
lazy_static = "1"

atlas-common = { path = "../../Atlas/Atlas-Common", default-features = true }
atlas-communication = { path = "../../Atlas/Atlas-Communication" }
atlas-core = { path = "../../Atlas/Atlas-Core" }
atlas-capnp = { path = "../../Atlas/Atlas-capnp", optional = true }
atlas-metrics = {path = "../../Atlas/Atlas-Metrics"}
atlas-metrics = { path = "../../Atlas/Atlas-Metrics" }

capnp = { version = "0.16" }
fastrand = "1.7.0"
bytes = "1.4.0"
chrono = "0.4"
intmap = "2.0.0"
intmap = "2"
either = "1"
oneshot = "0.1"
futures = "0.3.21"
futures-timer = "3"
async-tls = "0.12.0"
rustls = "0.20.6"
webpki = "0.22.0"
parking_lot = "0.12.1"
dashmap = "5.1.0"
thread_local = "1.1.4"
num_cpus = "1"
socket2 = "0.4"
event-listener = "2.5.2"
linked-hash-map = "0.5"
rand_core = { version = "0.6", features = ["getrandom"] }
smallvec = { version = "1", features = ["union", "write", "const_generics"] }
async-std = { version = "1", optional = true }
tokio = { version = "1.25.0", features = ["full"], optional = true }
tokio-util = { version = "0.7.1", features = ["compat"], optional = true }
tokio-metrics = { version = "0.1.0", optional = true }
ring = { version = "0.17.8", optional = true }
threadpool-crossbeam-channel = { version = "1.8.0", optional = true }
#async-semaphore = { version = "1", optional = true }

serde = { version = "*", features = ["derive", "rc"]}
serde = { version = "*", features = ["derive", "rc"] }
serde_bytes = { version = "0", optional = true }
bincode = { version = "2.0.0-rc.3", features = ["serde"], optional = true }

flume = { version = "0.10", optional = true }
async-channel = { version = "1", optional = true }
twox-hash = { version = "1", optional = true }
serde_bytes = { version = "0.11", optional = true }
fxhash = { version = "0.2", optional = true }
dsrust = { version = "0.1.9", git = "https://github.com/nuno1212s/DSRust", optional = true }
#dsrust = { path = "/home/nunogneto/Documents/Development/Rust/dsrust" }
crossbeam-channel = { version = "0.5.2", optional = true }
crossbeam-skiplist = "0.1.1"
rocksdb = { version = "0.20.1", optional = true }

log = "0.4.17"
env_logger = "0.10.0"
log4rs = { version = "1.1.1", features = ["file_appender"] }
tracing = "*"
#tracing = "0.1.32"
#tracing-subscriber = { version = "0.3.11", features = ["fmt"] }

num-bigint = "0.4.3"
num-traits = "0.2.15"


num-bigint = "*"
num-traits = "*"
event-listener = "*"
39 changes: 22 additions & 17 deletions febft-pbft-consensus/src/bft/consensus/accessory/replica/mod.rs
Original file line number Diff line number Diff line change
@@ -1,8 +1,9 @@
use std::collections::BTreeMap;
use std::sync::{Arc, Mutex};

use std::time::SystemTime;
use chrono::Utc;
use log::debug;

use tracing::debug;

use atlas_common::node_id::NodeId;
use atlas_common::ordering::{Orderable, SeqNo};
Expand All @@ -22,15 +23,15 @@ use crate::bft::sync::view::ViewInfo;
use crate::bft::{SysMsg, PBFT};

pub struct ReplicaAccessory<RQ>
where
RQ: SerType,
where
RQ: SerType,
{
speculative_commits: Arc<Mutex<BTreeMap<NodeId, StoredSerializedMessage<SysMsg<RQ>>>>>,
}

impl<RQ> AccessoryConsensus<RQ> for ReplicaAccessory<RQ>
where
RQ: SerType + 'static,
where
RQ: SerType + 'static,
{
fn handle_partial_pre_prepare<NT>(
&mut self,
Expand All @@ -41,7 +42,8 @@ impl<RQ> AccessoryConsensus<RQ> for ReplicaAccessory<RQ>
_node: &NT,
) where
NT: OrderProtocolSendNode<RQ, PBFT<RQ>>,
{}
{
}

fn handle_pre_prepare_phase_completed<NT>(
&mut self,
Expand Down Expand Up @@ -94,7 +96,7 @@ impl<RQ> AccessoryConsensus<RQ> for ReplicaAccessory<RQ>
Some(digest),
Some(&*key_pair),
)
.into_inner();
.into_inner();

// store serialized header + message
let serialized = SerializedMessage::new(message.clone(), buf.clone());
Expand Down Expand Up @@ -138,7 +140,8 @@ impl<RQ> AccessoryConsensus<RQ> for ReplicaAccessory<RQ>
_node: &NT,
) where
NT: OrderProtocolSendNode<RQ, PBFT<RQ>>,
{}
{
}

fn handle_preparing_quorum<NT>(
&mut self,
Expand Down Expand Up @@ -201,7 +204,8 @@ impl<RQ> AccessoryConsensus<RQ> for ReplicaAccessory<RQ>
_node: &NT,
) where
NT: OrderProtocolSendNode<RQ, PBFT<RQ>>,
{}
{
}

fn handle_committing_quorum<NT>(
&mut self,
Expand All @@ -212,21 +216,22 @@ impl<RQ> AccessoryConsensus<RQ> for ReplicaAccessory<RQ>
_node: &NT,
) where
NT: OrderProtocolSendNode<RQ, PBFT<RQ>>,
{}
{
}
}

impl<RQ> Default for ReplicaAccessory<RQ>
where
RQ: SerType,
where
RQ: SerType,
{
fn default() -> Self {
Self::new()
}
}

impl<RQ> ReplicaAccessory<RQ>
where
RQ: SerType,
where
RQ: SerType,
{
pub fn new() -> Self {
Self {
Expand All @@ -247,8 +252,8 @@ fn valid_spec_commits<RQ>(
seq_no: SeqNo,
view: &ViewInfo,
) -> bool
where
RQ: SerType,
where
RQ: SerType,
{
let len = speculative_commits.len();

Expand Down
11 changes: 7 additions & 4 deletions febft-pbft-consensus/src/bft/consensus/decision/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ use std::time::Instant;

use atlas_common::Err;
use chrono::Utc;
use log::{debug, info, warn};
use tracing::{debug, info, instrument, warn};
use thiserror::Error;

use atlas_common::error::*;
Expand All @@ -16,7 +16,7 @@ use atlas_communication::message::Header;
use atlas_core::messages::{ClientRqInfo, SessionBased};
use atlas_core::ordering_protocol::networking::OrderProtocolSendNode;
use atlas_core::ordering_protocol::ShareableMessage;
use atlas_core::timeouts::Timeouts;
use atlas_core::timeouts::timeout::TimeoutModHandle;
use atlas_metrics::metrics::metric_duration;

use crate::bft::consensus::accessory::replica::ReplicaAccessory;
Expand Down Expand Up @@ -223,6 +223,7 @@ where
}
}

#[instrument(skip(self), level = "debug")]
pub fn poll(&mut self) -> DecisionPollStatus<RQ> {
match self.phase {
DecisionPhase::Initialize => {
Expand Down Expand Up @@ -271,11 +272,12 @@ where
}

/// Process a message relating to this consensus instance
#[instrument(skip(self, synchronizer, timeouts, node), level = "debug")]
pub fn process_message<NT>(
&mut self,
s_message: ShareableMessage<PBFTMessage<RQ>>,
synchronizer: &Synchronizer<RQ>,
timeouts: &Timeouts,
timeouts: &TimeoutModHandle,
node: &Arc<NT>,
) -> Result<DecisionStatus<RQ>>
where
Expand Down Expand Up @@ -665,6 +667,7 @@ where
}

/// Finalize this consensus decision and return the information about the batch
#[instrument(skip(self), level = "debug")]
pub fn finalize(self) -> Result<CompletedBatch<RQ>> {
if let DecisionPhase::Decided = self.phase {
let seq = self.sequence_number();
Expand Down Expand Up @@ -703,7 +706,7 @@ where
fn request_batch_received<RQ>(
header: &Header,
pre_prepare: &ConsensusMessage<RQ>,
timeouts: &Timeouts,
timeouts: &TimeoutModHandle,
synchronizer: &Synchronizer<RQ>,
log: &WorkingDecisionLog<RQ>,
) -> Vec<ClientRqInfo>
Expand Down
Loading

0 comments on commit f8de08a

Please sign in to comment.