Skip to content
This repository has been archived by the owner on Oct 31, 2024. It is now read-only.

Commit

Permalink
feat: move broadcast logic to task state (#265)
Browse files Browse the repository at this point in the history
  • Loading branch information
Freyskeyd authored Jul 21, 2023
1 parent a59b773 commit 0fa742a
Show file tree
Hide file tree
Showing 14 changed files with 325 additions and 333 deletions.
2 changes: 1 addition & 1 deletion crates/topos-p2p/src/runtime/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -217,7 +217,7 @@ impl Runtime {
stats,
step,
} => {
println!("OutboundQueryProgressed: {id:?}, {result:?}, {stats:?}, {step:?}");
debug!("OutboundQueryProgressed: {id:?}, {result:?}, {stats:?}, {step:?}");
}

KademliaEvent::InboundRequest { .. } => {}
Expand Down
24 changes: 0 additions & 24 deletions crates/topos-p2p/src/tests/dht.rs
Original file line number Diff line number Diff line change
Expand Up @@ -40,30 +40,6 @@ async fn put_value_in_dht() {
.expect("Unable to create p2p network");

let mut runtime = runtime.bootstrap().await.unwrap();
// runtime
// .swarm
// .behaviour_mut()
// .discovery
// .inner
// .add_address(&peer_1.keypair.public().to_peer_id(), peer_1.addr);

// loop {
// if let Some(SwarmEvent::Behaviour(ComposedEvent::PeerInfo(event))) =
// runtime.swarm.next().await
// {
// println!("Event: {:?}", event);
// if let identify::Event::Received {
// peer_id,
// info: Info { observed_addr, .. },
// } = *event
// {
// println!("Peer {} observed address: {}", peer_id, observed_addr);
// runtime.swarm.add_external_address(observed_addr);
// break;
// }
// }
// }

let kad = &mut runtime.swarm.behaviour_mut().discovery;

let input_key = Key::new(&runtime.local_peer_id.to_string());
Expand Down
1 change: 0 additions & 1 deletion crates/topos-p2p/src/tests/support/macros.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@ macro_rules! wait_for_event {
($node:ident, matches: $(|)? $( $pattern:pat_param )|+ $( if $guard: expr )? $(,)?) => {
let assertion = async {
while let Some(event) = $node.next().await {
println!("Event: {:?}", event);
if matches!(event, $( $pattern )|+ $( if $guard )?) {
break;
}
Expand Down
1 change: 0 additions & 1 deletion crates/topos-tce-api/src/runtime/builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -160,7 +160,6 @@ pub struct RuntimeContext {

impl Drop for RuntimeContext {
fn drop(&mut self) {
println!("Dropping RuntimeContext");
tracing::warn!("Dropping RuntimeContext");
self.grpc_handler.abort();
self.graphql_handler.abort();
Expand Down
188 changes: 188 additions & 0 deletions crates/topos-tce-broadcast/src/double_echo/broadcast_state.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,188 @@
use std::time;

use tce_transport::ProtocolEvents;
use tokio::sync::mpsc;
use topos_core::uci::Certificate;
use topos_metrics::DOUBLE_ECHO_BROADCAST_FINISHED_TOTAL;
use topos_p2p::PeerId;
use tracing::{debug, info, warn};

use crate::sampler::SubscriptionsView;

mod status;

pub use status::Status;

pub struct BroadcastState {
subscriptions_view: SubscriptionsView,
status: Status,
certificate: Certificate,
echo_threshold: usize,
ready_threshold: usize,
delivery_threshold: usize,
event_sender: mpsc::Sender<ProtocolEvents>,
delivery_time: time::SystemTime,
}

impl BroadcastState {
pub fn new(
certificate: Certificate,
echo_threshold: usize,
ready_threshold: usize,
delivery_threshold: usize,
event_sender: mpsc::Sender<ProtocolEvents>,
subscriptions_view: SubscriptionsView,
need_gossip: bool,
) -> Self {
let mut state = Self {
subscriptions_view,
status: Status::Pending,
certificate,
echo_threshold,
ready_threshold,
delivery_threshold,
event_sender,
delivery_time: time::SystemTime::now(),
};

_ = state.event_sender.try_send(ProtocolEvents::Broadcast {
certificate_id: state.certificate.id,
});

if need_gossip {
warn!("📣 Gossiping the Certificate {}", &state.certificate.id);
let _ = state.event_sender.try_send(ProtocolEvents::Gossip {
cert: state.certificate.clone(),
});
}

state.update_status();

state
}

pub fn apply_echo(&mut self, peer_id: PeerId) -> Option<Status> {
self.subscriptions_view.echo.remove(&peer_id);
self.update_status()
}

pub fn apply_ready(&mut self, peer_id: PeerId) -> Option<Status> {
self.subscriptions_view.ready.remove(&peer_id);
self.update_status()
}

fn update_status(&mut self) -> Option<Status> {
// Nothing happened yet, we're in the initial state and didn't Procced
// any Echo or Ready messages
// Sending our Echo message
if let Status::Pending = self.status {
_ = self.event_sender.try_send(ProtocolEvents::Echo {
certificate_id: self.certificate.id,
});

self.status = Status::EchoSent;
debug!(
"📝 Certificate {} is now {}",
&self.certificate.id, self.status
);
return Some(self.status);
}

// Upon reaching the Echo or Ready threshold, if the status is either
// EchoSent or Delivered (without ReadySent), we send the Ready message
// and update the status accordingly.
// If the status was EchoSent, we update it to ReadySent
// If the status was Delivered, we update it to DeliveredWithReadySent
if !self.status.is_ready_sent() && self.reached_ready_threshold() {
let event = ProtocolEvents::Ready {
certificate_id: self.certificate.id,
};
self.event_sender.try_send(event).unwrap();

self.status = self.status.ready_sent();

debug!(
"📝 Certificate {} is now {}",
&self.certificate.id, self.status
);
return Some(self.status);
}

// Upon reaching the Delivery threshold, if the status is not Delivered,
// we update the status to Delivered and change the status
if !self.status.is_delivered() && self.reached_delivery_threshold() {
self.status = self.status.delivered();

debug!(
"📝 Certificate {} is now {}",
&self.certificate.id, self.status
);
// Calculate delivery time
let from = self.delivery_time;
let duration = from.elapsed().unwrap();
let d = duration;

info!(
"Certificate {} got delivered in {:?}",
self.certificate.id, d
);

debug!(
"📝 Accepted[{}]\t Delivery time: {:?}",
&self.certificate.id, d
);

DOUBLE_ECHO_BROADCAST_FINISHED_TOTAL.inc();

_ = self
.event_sender
.try_send(ProtocolEvents::CertificateDelivered {
certificate: self.certificate.clone(),
});

return Some(self.status);
}

None
}

fn reached_ready_threshold(&self) -> bool {
// Compute the threshold
let reached_echo_threshold = match self
.subscriptions_view
.network_size
.checked_sub(self.subscriptions_view.echo.len())
{
Some(consumed) => consumed >= self.echo_threshold,
None => false,
};

let reached_ready_threshold = match self
.subscriptions_view
.network_size
.checked_sub(self.subscriptions_view.ready.len())
{
Some(consumed) => consumed >= self.ready_threshold,
None => false,
};

debug!(
"📝 Certificate {} reached Echo threshold: {} and Ready threshold: {}",
&self.certificate.id, reached_echo_threshold, reached_ready_threshold
);
// If reached any of the Echo or Ready thresholds, I send the Ready
reached_echo_threshold || reached_ready_threshold
}

fn reached_delivery_threshold(&self) -> bool {
// If reached the delivery threshold, I can deliver
match self
.subscriptions_view
.network_size
.checked_sub(self.subscriptions_view.ready.len())
{
Some(consumed) => consumed >= self.delivery_threshold,
None => false,
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
use std::fmt::Display;

#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub enum Status {
Pending,
EchoSent,
ReadySent,
DeliveredWithReadySent,
Delivered,
}

impl Display for Status {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
match self {
Self::Pending => write!(f, "Pending"),
Self::EchoSent => write!(f, "EchoSent"),
Self::ReadySent => write!(f, "ReadySent"),
Self::DeliveredWithReadySent => write!(f, "DeliveredWithReadySent"),
Self::Delivered => write!(f, "Delivered"),
}
}
}

impl Status {
pub(crate) fn is_ready_sent(&self) -> bool {
matches!(self, Self::ReadySent) || matches!(self, Self::DeliveredWithReadySent)
}

pub(crate) fn is_delivered(&self) -> bool {
matches!(self, Self::Delivered) || matches!(self, Self::DeliveredWithReadySent)
}

pub(crate) fn ready_sent(self) -> Self {
match self {
Self::EchoSent => Self::ReadySent,
Self::Delivered => Self::DeliveredWithReadySent,
_ => self,
}
}

pub(crate) fn delivered(self) -> Self {
match self {
Self::ReadySent => Self::DeliveredWithReadySent,
_ => Self::Delivered,
}
}
}
Loading

0 comments on commit 0fa742a

Please sign in to comment.