Skip to content

Commit

Permalink
Change log to tracing, add some instrumentation.
Browse files Browse the repository at this point in the history
  • Loading branch information
nuno1212s committed Apr 3, 2024
1 parent 4b3e4f2 commit 1094045
Show file tree
Hide file tree
Showing 4 changed files with 40 additions and 5 deletions.
5 changes: 4 additions & 1 deletion febft-pbft-consensus/src/bft/consensus/decision/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ use std::time::Instant;

use atlas_common::Err;
use chrono::Utc;
use tracing::{debug, info, warn};
use tracing::{debug, info, instrument, warn};
use thiserror::Error;

use atlas_common::error::*;
Expand Down Expand Up @@ -223,6 +223,7 @@ where
}
}

#[instrument(skip(self), level = "debug")]
pub fn poll(&mut self) -> DecisionPollStatus<RQ> {
match self.phase {
DecisionPhase::Initialize => {
Expand Down Expand Up @@ -271,6 +272,7 @@ where
}

/// Process a message relating to this consensus instance
#[instrument(skip(self, synchronizer, timeouts, node), level = "debug")]
pub fn process_message<NT>(
&mut self,
s_message: ShareableMessage<PBFTMessage<RQ>>,
Expand Down Expand Up @@ -665,6 +667,7 @@ where
}

/// Finalize this consensus decision and return the information about the batch
#[instrument(skip(self), level = "debug")]
pub fn finalize(self) -> Result<CompletedBatch<RQ>> {
if let DecisionPhase::Decided = self.phase {
let seq = self.sequence_number();
Expand Down
32 changes: 29 additions & 3 deletions febft-pbft-consensus/src/bft/consensus/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,10 +2,11 @@ use std::cmp::Reverse;
use std::collections::{BTreeMap, BTreeSet, BinaryHeap, VecDeque};
use std::sync::atomic::{AtomicBool, Ordering};
use std::sync::{Arc, Mutex};
use std::fmt::{Debug, Formatter};

use either::Either;
use event_listener::{Event, Listener};
use tracing::{debug, error, info, trace, warn};
use tracing::{debug, error, info, trace, warn, instrument};

use atlas_common::error::*;
use atlas_common::globals::ReadOnly;
Expand Down Expand Up @@ -59,7 +60,6 @@ pub enum ConsensusStatus<O> {
Decided(MaybeVec<OPDecision<O>>),
}

#[derive(Debug)]
/// Represents the status of calling `poll()` on a `Consensus`.
pub enum ConsensusPollStatus<O> {
/// The `Replica` associated with this `Consensus` should
Expand Down Expand Up @@ -270,6 +270,7 @@ where
}

/// Queue a given message into our message queues.
#[instrument(skip(self), level = "debug")]
pub fn queue(&mut self, message: ShareableMessage<PBFTMessage<RQ>>) {
let message_seq = message.message().sequence_number();

Expand Down Expand Up @@ -323,8 +324,9 @@ where
self.signalled.push_signalled(message_seq);
}
}

/// Poll the given consensus
#[instrument(skip_all, level = "debug", ret)]
pub fn poll(&mut self) -> ConsensusPollStatus<RQ> {
trace!("Current signal queue: {:?}", self.signalled);

Expand Down Expand Up @@ -376,6 +378,7 @@ where
ConsensusPollStatus::Recv
}

#[instrument(skip(self, synchronizer, timeouts, node), level = "debug")]
pub fn process_message<NT>(
&mut self,
s_message: ShareableMessage<PBFTMessage<RQ>>,
Expand Down Expand Up @@ -504,6 +507,7 @@ where
}

/// Finalize the next consensus instance if possible
#[instrument(skip(self), level = "debug")]
pub fn finalize(&mut self, view: &ViewInfo) -> Result<Option<CompletedBatch<RQ>>> {
// If the decision can't be finalized, then we can't finalize the batch
if let Some(decision) = self.decisions.front() {
Expand Down Expand Up @@ -535,6 +539,7 @@ where
/// Advance to the next instance of the consensus
/// This will also create the necessary new decision to keep the pending decisions
/// equal to the water mark
#[instrument(skip(self), level = "debug")]
pub fn next_instance(&mut self, view: &ViewInfo) -> ConsensusDecision<RQ> {
let decision = self.decisions.pop_front().unwrap();

Expand Down Expand Up @@ -567,6 +572,7 @@ where
decision
}

#[instrument(skip(self), level = "debug")]
pub fn install_sequence_number(&mut self, novel_seq_no: SeqNo, view: &ViewInfo) {
info!(
"{:?} // Installing sequence number {:?} vs current {:?}",
Expand Down Expand Up @@ -717,6 +723,7 @@ where
}

/// Catch up to the quorums latest decided consensus
#[instrument(skip(self, proof, log), level = "debug", fields(proof_seq = proof.sequence_number().into_u32()))]
pub fn catch_up_to_quorum(
&mut self,
view: &ViewInfo,
Expand All @@ -735,6 +742,7 @@ where

/// Create a fake `PRE-PREPARE`. This is useful during the view
/// change protocol.
#[instrument(skip(self, requests), level = "debug", fields(request_count = requests.len()))]
pub fn forge_propose(&self, requests: Vec<StoredMessage<RQ>>, view: &ViewInfo) -> SysMsg<RQ> {
PBFTMessage::Consensus(ConsensusMessage::new(
self.sequence_number(),
Expand All @@ -744,6 +752,7 @@ where
}

/// Install a given view into the current consensus decisions.
#[instrument(skip(self), level = "debug")]
pub fn install_view(&mut self, view: &ViewInfo) {
let view_index = match view
.sequence_number()
Expand Down Expand Up @@ -830,6 +839,7 @@ where
}

/// Finalize the view change protocol
#[instrument(skip(self, synchronizer, timeouts, node, _log), level = "debug")]
pub fn finalize_view_change<NT>(
&mut self,
(header, message): (Header, ConsensusMessage<RQ>),
Expand Down Expand Up @@ -1101,3 +1111,19 @@ impl Signals {
self.signaled_seq_no.clear();
}
}

impl<O> Debug for ConsensusPollStatus<O> {
fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
match self {
ConsensusPollStatus::Recv => {
write!(f, "Receive messages")
}
ConsensusPollStatus::NextMessage(_) => {
write!(f, "Message Ready")
}
ConsensusPollStatus::Decided(_) => {
write!(f, "Decided instance")
}
}
}
}
1 change: 1 addition & 0 deletions febft-state-transfer/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ anyhow = "1.0"
thiserror = "1.0"
serde = { version = "*", optional = true }
capnp = { version = "0.16.1", optional = true }
lazy_static = "*"
tracing = "*"

atlas-common = { path = "../../Atlas/Atlas-Common" }
Expand Down
7 changes: 6 additions & 1 deletion febft-state-transfer/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ use std::sync::Arc;
use std::time::{Duration, Instant};

use anyhow::anyhow;
use lazy_static::lazy_static;
use tracing::{debug, error, info, warn};
#[cfg(feature = "serialize_serde")]
use serde::{Deserialize, Serialize};
Expand Down Expand Up @@ -43,6 +44,10 @@ pub mod config;
pub mod message;
pub mod metrics;

lazy_static!(
static ref MOD_NAME: Arc<str> = Arc::from("ST_TRANSFER");
);

/// The state of the checkpoint
pub enum CheckpointState<D> {
// no checkpoint has been performed yet
Expand Down Expand Up @@ -235,7 +240,7 @@ where
NT: StateTransferSendNode<CSTMsg<S>> + 'static,
{
fn mod_name() -> Arc<str> {
todo!()
MOD_NAME.clone()
}

fn handle_timeout(&mut self, timeout: Vec<ModTimeout>) -> Result<STTimeoutResult> {
Expand Down

0 comments on commit 1094045

Please sign in to comment.