diff --git a/crates/topos-tce-broadcast/Cargo.toml b/crates/topos-tce-broadcast/Cargo.toml index 19879f7be..539a5f8a6 100644 --- a/crates/topos-tce-broadcast/Cargo.toml +++ b/crates/topos-tce-broadcast/Cargo.toml @@ -32,7 +32,7 @@ rand.workspace = true topos-test-sdk = { path = "../topos-test-sdk/" } [features] -default = [] +task-manager-channels = [] [[bench]] name = "double_echo" diff --git a/crates/topos-tce-broadcast/benches/double_echo.rs b/crates/topos-tce-broadcast/benches/double_echo.rs index a40bf1fda..0be6ccc97 100644 --- a/crates/topos-tce-broadcast/benches/double_echo.rs +++ b/crates/topos-tce-broadcast/benches/double_echo.rs @@ -1,29 +1,17 @@ use criterion::async_executor::FuturesExecutor; use criterion::{criterion_group, criterion_main, Criterion}; - -mod task_manager_channels; -mod task_manager_futures; +mod task_manager; pub fn criterion_benchmark(c: &mut Criterion) { - let echo_messages = 10; + let certificates = 10_000; let runtime = tokio::runtime::Builder::new_current_thread() .build() .unwrap(); - c.bench_function("double_echo with channels", |b| { - b.to_async(FuturesExecutor).iter(|| async { - runtime.block_on(async { - task_manager_channels::processing_double_echo(echo_messages).await - }) - }) - }); - - c.bench_function("double_echo with futures", |b| { + c.bench_function("double_echo", |b| { b.to_async(FuturesExecutor).iter(|| async { - runtime.block_on(async { - task_manager_futures::processing_double_echo(echo_messages).await - }) + runtime.block_on(async { task_manager::processing_double_echo(certificates).await }) }) }); } diff --git a/crates/topos-tce-broadcast/benches/task_manager.rs b/crates/topos-tce-broadcast/benches/task_manager.rs new file mode 100644 index 000000000..1945b0601 --- /dev/null +++ b/crates/topos-tce-broadcast/benches/task_manager.rs @@ -0,0 +1,118 @@ +use std::collections::HashSet; +use tce_transport::{ProtocolEvents, ReliableBroadcastParams}; +use tokio::sync::mpsc::Receiver; +use tokio::sync::{mpsc, oneshot}; +use topos_tce_broadcast::double_echo::DoubleEcho; +use topos_tce_broadcast::sampler::SubscriptionsView; +use topos_test_sdk::certificates::create_certificate_chain; +use topos_test_sdk::constants::{SOURCE_SUBNET_ID_1, TARGET_SUBNET_ID_1}; + +const CHANNEL_SIZE: usize = 256_000; + +struct TceParams { + nb_peers: usize, + broadcast_params: ReliableBroadcastParams, +} + +struct Context { + event_receiver: Receiver<ProtocolEvents>, +} + +pub async fn processing_double_echo(n: u64) { + let (subscriptions_view_sender, subscriptions_view_receiver) = mpsc::channel(CHANNEL_SIZE); + + let (_cmd_sender, cmd_receiver) = mpsc::channel(CHANNEL_SIZE); + let (event_sender, event_receiver) = mpsc::channel(CHANNEL_SIZE); + let (_double_echo_shutdown_sender, double_echo_shutdown_receiver) = + mpsc::channel::<oneshot::Sender<()>>(1); + let (task_manager_message_sender, task_manager_message_receiver) = mpsc::channel(CHANNEL_SIZE); + + let params = TceParams { + nb_peers: 10, + broadcast_params: ReliableBroadcastParams { + echo_threshold: 8, + ready_threshold: 5, + delivery_threshold: 8, + }, + }; + + let mut ctx = Context { event_receiver }; + + let mut double_echo = DoubleEcho::new( + params.broadcast_params, + task_manager_message_sender.clone(), + cmd_receiver, + event_sender, + double_echo_shutdown_receiver, + 0, + ); + + // List of peers + let mut peers = HashSet::new(); + for i in 0..params.nb_peers { + let peer = topos_p2p::utils::local_key_pair(Some(i as u8)) + .public() + .to_peer_id(); + peers.insert(peer); + } + + // Subscriptions + double_echo.subscriptions.echo = peers.clone(); + double_echo.subscriptions.ready = peers.clone(); + double_echo.subscriptions.network_size = params.nb_peers; + + let msg = SubscriptionsView { + echo: peers.clone(), + ready: peers.clone(), + network_size: params.nb_peers, + }; + + subscriptions_view_sender.send(msg).await.unwrap(); + + double_echo.spawn_task_manager(subscriptions_view_receiver, task_manager_message_receiver); + + let certificates = + create_certificate_chain(SOURCE_SUBNET_ID_1, &[TARGET_SUBNET_ID_1], n as usize); + + let double_echo_selected_echo = double_echo + .subscriptions + .echo + .iter() + .take(double_echo.params.echo_threshold) + .cloned() + .collect::<Vec<_>>(); + + let double_echo_selected_ready = double_echo + .subscriptions + .ready + .iter() + .take(double_echo.params.delivery_threshold) + .cloned() + .collect::<Vec<_>>(); + + for cert in &certificates { + double_echo.broadcast(cert.clone(), true).await; + } + + for cert in &certificates { + for p in &double_echo_selected_echo { + double_echo.handle_echo(*p, cert.id).await; + } + + for p in &double_echo_selected_ready { + double_echo.handle_ready(*p, cert.id).await; + } + } + + let mut count = 0; + + while let Some(event) = ctx.event_receiver.recv().await { + if let ProtocolEvents::CertificateDelivered { .. } = event { + count += 1; + + if count == n { + break; + } + } + } +} diff --git a/crates/topos-tce-broadcast/benches/task_manager_channels.rs b/crates/topos-tce-broadcast/benches/task_manager_channels.rs deleted file mode 100644 index b24309e18..000000000 --- a/crates/topos-tce-broadcast/benches/task_manager_channels.rs +++ /dev/null @@ -1,61 +0,0 @@ -use std::collections::HashMap; - -use rand::Rng; -use tokio::spawn; -use tokio::sync::mpsc; - -use topos_core::uci::CertificateId; -use topos_p2p::PeerId; -use topos_tce_broadcast::task_manager_channels::{TaskManager, Thresholds}; -use topos_tce_broadcast::DoubleEchoCommand; - -pub async fn processing_double_echo(n: u64) { - let (message_sender, message_receiver) = mpsc::channel(1024); - let (task_completion_sender, task_completion_receiver) = mpsc::channel(1024); - let (event_sender, mut event_receiver) = mpsc::channel(1024); - - let task_manager = TaskManager { - message_receiver, - task_completion: task_completion_receiver, - task_context: HashMap::new(), - thresholds: Thresholds { - echo: n as usize, - ready: n as usize, - delivery: n as usize, - }, - }; - - spawn(task_manager.run(task_completion_sender, event_sender)); - - let mut certificates = vec![]; - - let mut rng = rand::thread_rng(); - - for _ in 0..10_000 { - let mut id = [0u8; 32]; - rng.fill(&mut id); - let cert_id = CertificateId::from_array(id); - certificates.push(cert_id); - } - - for certificate_id in certificates { - for _ in 0..n { - let echo = DoubleEchoCommand::Echo { - from_peer: PeerId::random(), - certificate_id, - }; - - message_sender.send(echo).await.unwrap(); - } - } - - let mut count = 0; - - while (event_receiver.recv().await).is_some() { - count += 1; - - if count == n { - break; - } - } -} diff --git a/crates/topos-tce-broadcast/benches/task_manager_futures.rs b/crates/topos-tce-broadcast/benches/task_manager_futures.rs deleted file mode 100644 index 52225e22c..000000000 --- a/crates/topos-tce-broadcast/benches/task_manager_futures.rs +++ /dev/null @@ -1,62 +0,0 @@ -use futures::stream::FuturesUnordered; -use rand::Rng; -use tokio::spawn; -use tokio::sync::mpsc; -use topos_core::uci::CertificateId; -use topos_p2p::PeerId; -use topos_tce_broadcast::DoubleEchoCommand; - -use topos_tce_broadcast::task_manager_futures::{TaskManager, Thresholds}; - -pub async fn processing_double_echo(n: u64) { - let (message_sender, message_receiver) = mpsc::channel(1024); - let (task_completion_sender, mut task_completion_receiver) = mpsc::channel(48_000); - let (shutdown_sender, shutdown_receiver) = mpsc::channel(1); - - let task_manager = TaskManager { - message_receiver, - task_completion_sender, - tasks: Default::default(), - running_tasks: FuturesUnordered::new(), - thresholds: Thresholds { - echo: n as usize, - ready: n as usize, - delivery: n as usize, - }, - shutdown_sender, - }; - - spawn(task_manager.run(shutdown_receiver)); - - let mut certificates = vec![]; - - let mut rng = rand::thread_rng(); - - for _ in 0..10_000 { - let mut id = [0u8; 32]; - rng.fill(&mut id); - let cert_id = CertificateId::from_array(id); - certificates.push(cert_id); - } - - for certificate_id in certificates { - for _ in 0..n { - let echo = DoubleEchoCommand::Echo { - from_peer: PeerId::random(), - certificate_id, - }; - - message_sender.send(echo).await.unwrap(); - } - } - - let mut count = 0; - - while let Some((_, _)) = task_completion_receiver.recv().await { - count += 1; - - if count == n { - break; - } - } -} diff --git a/crates/topos-tce-broadcast/src/constant.rs b/crates/topos-tce-broadcast/src/constant.rs index ab5942117..01dbfc5d6 100644 --- a/crates/topos-tce-broadcast/src/constant.rs +++ b/crates/topos-tce-broadcast/src/constant.rs @@ -7,6 +7,30 @@ lazy_static! { .ok() .and_then(|s| s.parse().ok()) .unwrap_or(2048); + /// Size of the channel between double echo and the task manager + pub static ref BROADCAST_TASK_MANAGER_CHANNEL_SIZE: usize = + std::env::var("TOPOS_BROADCAST_TASK_MANAGER_CHANNEL_SIZE") + .ok() + .and_then(|s| s.parse().ok()) + .unwrap_or(20_480); + /// Size of the channel to send protocol events from the double echo + pub static ref PROTOCOL_CHANNEL_SIZE: usize = + std::env::var("TOPOS_PROTOCOL_CHANNEL_SIZE") + .ok() + .and_then(|s| s.parse().ok()) + .unwrap_or(2048); + /// Size of the channel to send updated subscriptions views to the double echo + pub static ref SUBSCRIPTION_VIEW_CHANNEL_SIZE: usize = + std::env::var("TOPOS_SUBSCRIPTION_VIEW_CHANNEL_SIZE") + .ok() + .and_then(|s| s.parse().ok()) + .unwrap_or(2048); + /// Size of the channel to send updated subscriptions views to the double echo + pub static ref BROADCAST_TASK_COMPLETION_CHANNEL_SIZE: usize = + std::env::var("BROADCAST_TASK_COMPLETION_CHANNEL_SIZE") + .ok() + .and_then(|s| s.parse().ok()) + .unwrap_or(2048); /// Capacity alert threshold for the double echo command channel pub static ref COMMAND_CHANNEL_CAPACITY: usize = COMMAND_CHANNEL_SIZE .checked_mul(10) diff --git a/crates/topos-tce-broadcast/src/double_echo/broadcast_state.rs b/crates/topos-tce-broadcast/src/double_echo/broadcast_state.rs index 62079a68f..64e33fa0d 100644 --- a/crates/topos-tce-broadcast/src/double_echo/broadcast_state.rs +++ b/crates/topos-tce-broadcast/src/double_echo/broadcast_state.rs @@ -13,6 +13,7 @@ mod status; pub use status::Status; +#[derive(Debug)] pub struct BroadcastState { subscriptions_view: SubscriptionsView, status: Status, @@ -97,7 +98,9 @@ impl BroadcastState { let event = ProtocolEvents::Ready { certificate_id: self.certificate.id, }; - self.event_sender.try_send(event).unwrap(); + if let Err(e) = self.event_sender.try_send(event) { + warn!("Error sending Ready message: {}", e); + } self.status = self.status.ready_sent(); diff --git a/crates/topos-tce-broadcast/src/double_echo/mod.rs b/crates/topos-tce-broadcast/src/double_echo/mod.rs index c630a29db..61df3c6ec 100644 --- a/crates/topos-tce-broadcast/src/double_echo/mod.rs +++ b/crates/topos-tce-broadcast/src/double_echo/mod.rs @@ -1,68 +1,30 @@ -use crate::constant; +use crate::TaskStatus; use crate::{DoubleEchoCommand, SubscriptionsView}; use std::collections::HashSet; -use std::collections::{HashMap, VecDeque}; use tce_transport::{ProtocolEvents, ReliableBroadcastParams}; use tokio::sync::{mpsc, oneshot}; use topos_core::uci::{Certificate, CertificateId}; -use topos_metrics::{ - CERTIFICATE_RECEIVED_FROM_API_TOTAL, CERTIFICATE_RECEIVED_FROM_GOSSIP_TOTAL, - CERTIFICATE_RECEIVED_TOTAL, DOUBLE_ECHO_BROADCAST_CREATED_TOTAL, - DOUBLE_ECHO_BUFFERED_MESSAGE_COUNT, DOUBLE_ECHO_BUFFER_CAPACITY_TOTAL, - DOUBLE_ECHO_CURRENT_BUFFER_SIZE, -}; -use topos_p2p::PeerId; -use tracing::{debug, error, info, warn, warn_span, Span}; - -use self::broadcast_state::BroadcastState; -mod broadcast_state; +use topos_p2p::PeerId; +use tracing::{error, info, warn}; -/// Processing data associated to a Certificate candidate for delivery -/// Sample repartition, one peer may belongs to multiple samples -#[derive(Clone)] -pub struct DeliveryState { - pub subscriptions: SubscriptionsView, - pub ready_sent: bool, - pub delivered: bool, -} +pub mod broadcast_state; pub struct DoubleEcho { /// Channel to receive commands command_receiver: mpsc::Receiver<DoubleEchoCommand>, - /// Channel to receive subscriptions updates - subscriptions_view_receiver: mpsc::Receiver<SubscriptionsView>, /// Channel to send events event_sender: mpsc::Sender<ProtocolEvents>, - /// Channel to receive shutdown signal pub(crate) shutdown: mpsc::Receiver<oneshot::Sender<()>>, - - /// pending certificates state - pending_certificate_count: u64, - /// buffer of certificates to process - buffer: VecDeque<(bool, Certificate)>, - - /// known certificate ids to avoid processing twice the same certificate - known_certificates: HashSet<CertificateId>, - - /// delivered certificate ids to avoid processing twice the same certificate + /// Delivered certificate ids to avoid processing twice the same certificate delivered_certificates: HashSet<CertificateId>, - - pub(crate) params: ReliableBroadcastParams, - - /// Current certificates being processed - cert_candidate: HashMap<CertificateId, BroadcastState>, - - /// Span tracker for each certificate - span_tracker: HashMap<CertificateId, Span>, - - pub(crate) subscriptions: SubscriptionsView, // My subscriptions for echo, ready and delivery feedback - - local_peer_id: String, - - /// Buffer of messages to be processed once the certificate payload is received - buffered_messages: HashMap<CertificateId, Vec<DoubleEchoCommand>>, + /// The threshold parameters for the double echo + pub params: ReliableBroadcastParams, + /// The connection to the TaskManager to forward DoubleEchoCommand messages + task_manager_message_sender: mpsc::Sender<DoubleEchoCommand>, + /// The overview of the network, which holds echo and ready subscriptions and the network size + pub subscriptions: SubscriptionsView, } impl DoubleEcho { @@ -71,31 +33,65 @@ impl DoubleEcho { #[allow(clippy::too_many_arguments)] pub fn new( params: ReliableBroadcastParams, + task_manager_message_sender: mpsc::Sender<DoubleEchoCommand>, command_receiver: mpsc::Receiver<DoubleEchoCommand>, - subscriptions_view_receiver: mpsc::Receiver<SubscriptionsView>, event_sender: mpsc::Sender<ProtocolEvents>, shutdown: mpsc::Receiver<oneshot::Sender<()>>, - local_peer_id: String, - pending_certificate_count: u64, + _pending_certificate_count: u64, ) -> Self { Self { - pending_certificate_count, params, + task_manager_message_sender, command_receiver, - subscriptions_view_receiver, event_sender, - cert_candidate: Default::default(), - span_tracker: Default::default(), subscriptions: SubscriptionsView::default(), - buffer: VecDeque::new(), shutdown, - local_peer_id, - buffered_messages: Default::default(), delivered_certificates: Default::default(), - known_certificates: Default::default(), } } + #[cfg(not(feature = "task-manager-channels"))] + pub fn spawn_task_manager( + &mut self, + subscriptions_view_receiver: mpsc::Receiver<SubscriptionsView>, + task_manager_message_receiver: mpsc::Receiver<DoubleEchoCommand>, + ) -> mpsc::Receiver<(CertificateId, TaskStatus)> { + let (task_completion_sender, task_completion_receiver) = mpsc::channel(2048); + + let (task_manager, shutdown_receiver) = crate::task_manager_futures::TaskManager::new( + task_manager_message_receiver, + task_completion_sender, + subscriptions_view_receiver, + self.event_sender.clone(), + self.params.clone(), + ); + + tokio::spawn(task_manager.run(shutdown_receiver)); + + task_completion_receiver + } + + #[cfg(feature = "task-manager-channels")] + pub fn spawn_task_manager( + &mut self, + subscriptions_view_receiver: mpsc::Receiver<SubscriptionsView>, + task_manager_message_receiver: mpsc::Receiver<DoubleEchoCommand>, + ) -> mpsc::Receiver<(CertificateId, TaskStatus)> { + let (task_completion_sender, task_completion_receiver) = mpsc::channel(2048); + + let (task_manager, shutdown_receiver) = crate::task_manager_channels::TaskManager::new( + task_manager_message_receiver, + task_completion_sender, + subscriptions_view_receiver, + self.event_sender.clone(), + self.params.clone(), + ); + + tokio::spawn(task_manager.run(shutdown_receiver)); + + task_completion_receiver + } + /// DoubleEcho main loop /// - Listen for shutdown signal /// - Read new messages from command_receiver @@ -103,11 +99,28 @@ impl DoubleEcho { /// - If a new subscription view is received, update the subscriptions /// - If a new Echo/Ready is received, update the state of the certificate or buffer /// the message - pub(crate) async fn run(mut self) { + pub(crate) async fn run( + mut self, + mut subscriptions_view_receiver: mpsc::Receiver<SubscriptionsView>, + task_manager_message_receiver: mpsc::Receiver<DoubleEchoCommand>, + ) { + let (forwarding_subscriptions_sender, forwarding_subscriptions_receiver) = + mpsc::channel(2048); + let mut task_completion = self.spawn_task_manager( + forwarding_subscriptions_receiver, + task_manager_message_receiver, + ); + info!("DoubleEcho started"); let shutdowned: Option<oneshot::Sender<()>> = loop { tokio::select! { + biased; + + Some(new_subscriptions_view) = subscriptions_view_receiver.recv() => { + forwarding_subscriptions_sender.send(new_subscriptions_view.clone()).await.unwrap(); + self.subscriptions = new_subscriptions_view; + } shutdown = self.shutdown.recv() => { warn!("Double echo shutdown signal received {:?}", shutdown); @@ -116,63 +129,34 @@ impl DoubleEcho { Some(command) = self.command_receiver.recv() => { match command { - DoubleEchoCommand::Broadcast { need_gossip, cert } => self.handle_broadcast(cert,need_gossip), + DoubleEchoCommand::Broadcast { need_gossip, cert } => self.broadcast(cert, need_gossip).await, command if self.subscriptions.is_some() => { match command { - DoubleEchoCommand::Echo { from_peer, certificate_id } => self.handle_echo(from_peer, certificate_id), - DoubleEchoCommand::Ready { from_peer, certificate_id } => self.handle_ready(from_peer, certificate_id), + DoubleEchoCommand::Echo { from_peer, certificate_id } => self.handle_echo(from_peer, certificate_id).await, + DoubleEchoCommand::Ready { from_peer, certificate_id } => self.handle_ready(from_peer, certificate_id).await, _ => {} } - } + }, command => { warn!("Received a command {command:?} while not having a complete sampling"); } } } - Some(new_subscriptions_view) = self.subscriptions_view_receiver.recv() => { - info!("Starting to use the new operational set of samples: {:?}", &new_subscriptions_view); - self.subscriptions = new_subscriptions_view; + Some((certificate_id, status)) = task_completion.recv() => { + if status == TaskStatus::Success { + self.delivered_certificates.insert(certificate_id); + } } + else => { warn!("Break the tokio loop for the double echo"); break None; } }; - - // Broadcast next certificate - if self.subscriptions.is_some() { - if let Some((need_gossip, cert)) = self.buffer.pop_front() { - DOUBLE_ECHO_CURRENT_BUFFER_SIZE.dec(); - let certificate_id = cert.id; - - self.broadcast(cert, need_gossip); - - if let Some(messages) = self.buffered_messages.remove(&certificate_id) { - for message in messages { - DOUBLE_ECHO_BUFFERED_MESSAGE_COUNT.dec(); - match message { - DoubleEchoCommand::Echo { - from_peer, - certificate_id, - } => { - self.consume_echo(from_peer, &certificate_id); - } - DoubleEchoCommand::Ready { - from_peer, - certificate_id, - } => { - self.consume_ready(from_peer, &certificate_id); - } - _ => {} - } - } - } - } - } }; if let Some(sender) = shutdowned { @@ -188,9 +172,8 @@ impl DoubleEcho { /// Called to process potentially new certificate: /// - either submitted from API ( [tce_transport::TceCommands::Broadcast] command) /// - or received through the gossip (first step of protocol exchange) - pub(crate) fn broadcast(&mut self, cert: Certificate, origin: bool) { + pub async fn broadcast(&mut self, cert: Certificate, origin: bool) { info!("🙌 Starting broadcasting the Certificate {}", &cert.id); - if self.cert_pre_broadcast_check(&cert).is_err() { error!("Failure on the pre-check for the Certificate {}", &cert.id); self.event_sender @@ -200,15 +183,6 @@ impl DoubleEcho { .unwrap(); return; } - // Don't gossip one cert already gossiped - if self.cert_candidate.contains_key(&cert.id) { - self.event_sender - .try_send(ProtocolEvents::BroadcastFailed { - certificate_id: cert.id, - }) - .unwrap(); - return; - } if self.delivered_certificates.get(&cert.id).is_some() { self.event_sender @@ -220,28 +194,22 @@ impl DoubleEcho { return; } - // Trigger event of new certificate candidate for delivery - let certificate_id = cert.id; - // To include tracing context in client requests from _this_ app, - // use `context` to extract the current OpenTelemetry context. - // Add new entry for the new Cert candidate - match self.delivery_state_for_new_cert(cert, origin) { - Some(delivery_state) => { - self.cert_candidate.insert(certificate_id, delivery_state); - } - None => { - error!("Ill-formed samples"); - _ = self.event_sender.try_send(ProtocolEvents::Die); - } + if self + .delivery_state_for_new_cert(cert, origin) + .await + .is_none() + { + error!("Ill-formed samples"); + _ = self.event_sender.try_send(ProtocolEvents::Die); } } /// Build initial delivery state - fn delivery_state_for_new_cert( + async fn delivery_state_for_new_cert( &mut self, certificate: Certificate, origin: bool, - ) -> Option<BroadcastState> { + ) -> Option<bool> { let subscriptions = self.subscriptions.clone(); // Check whether inbound sets are empty @@ -253,15 +221,15 @@ impl DoubleEcho { ); None } else { - Some(BroadcastState::new( - certificate, - self.params.echo_threshold, - self.params.ready_threshold, - self.params.delivery_threshold, - self.event_sender.clone(), - subscriptions, - origin, - )) + _ = self + .task_manager_message_sender + .send(DoubleEchoCommand::Broadcast { + need_gossip: origin, + cert: certificate, + }) + .await; + + Some(true) } } @@ -280,122 +248,27 @@ impl DoubleEcho { } impl DoubleEcho { - pub(crate) fn handle_broadcast(&mut self, cert: Certificate, need_gossip: bool) { - if !self.known_certificates.contains(&cert.id) { - let span = warn_span!( - "Broadcast", - peer_id = self.local_peer_id, - certificate_id = cert.id.to_string() - ); - DOUBLE_ECHO_BROADCAST_CREATED_TOTAL.inc(); - span.in_scope(|| { - warn!("Broadcast registered for {}", cert.id); - self.span_tracker.insert(cert.id, span.clone()); - CERTIFICATE_RECEIVED_TOTAL.inc(); - - if need_gossip { - CERTIFICATE_RECEIVED_FROM_API_TOTAL.inc(); - } else { - CERTIFICATE_RECEIVED_FROM_GOSSIP_TOTAL.inc(); - } - }); - - self.known_certificates.insert(cert.id); - span.in_scope(|| { - debug!("DoubleEchoCommand::Broadcast certificate_id: {}", cert.id); - if self.buffer.len() < *constant::TOPOS_DOUBLE_ECHO_MAX_BUFFER_SIZE { - self.buffer.push_back((need_gossip, cert)); - DOUBLE_ECHO_CURRENT_BUFFER_SIZE.inc(); - } else { - DOUBLE_ECHO_BUFFER_CAPACITY_TOTAL.inc(); - // Adding one to the pending_certificate_count because we - // can't buffer it right now - _ = self.pending_certificate_count.checked_add(1); - } - }); - } - } - - pub(crate) fn handle_echo(&mut self, from_peer: PeerId, certificate_id: CertificateId) { - let cert_delivered = self.delivered_certificates.get(&certificate_id).is_some(); - if !cert_delivered { - if self.known_certificates.get(&certificate_id).is_some() { - debug!( - "Handling DoubleEchoCommand::Echo from_peer: {} cert_id: {}", - &from_peer, certificate_id - ); - if let Some(status) = self.consume_echo(from_peer, &certificate_id) { - if status.is_delivered() { - self.delivered_certificates.insert(certificate_id); - self.span_tracker.remove(&certificate_id); - self.cert_candidate.remove(&certificate_id); - } - } - - // need to deliver the certificate - } else if self.delivered_certificates.get(&certificate_id).is_none() { - // need to buffer the Echo - self.buffered_messages - .entry(certificate_id) - .or_default() - .push(DoubleEchoCommand::Echo { - from_peer, - certificate_id, - }); - DOUBLE_ECHO_BUFFERED_MESSAGE_COUNT.inc(); - } + pub async fn handle_echo(&mut self, from_peer: PeerId, certificate_id: CertificateId) { + if self.delivered_certificates.get(&certificate_id).is_none() { + let _ = self + .task_manager_message_sender + .send(DoubleEchoCommand::Echo { + from_peer, + certificate_id, + }) + .await; } } - pub(crate) fn handle_ready(&mut self, from_peer: PeerId, certificate_id: CertificateId) { - let cert_delivered = self.delivered_certificates.get(&certificate_id).is_some(); - if !cert_delivered { - if self.known_certificates.get(&certificate_id).is_some() { - debug!( - "Handling DoubleEchoCommand::Ready from_peer: {} cert_id: {}", - &from_peer, &certificate_id - ); - - if let Some(status) = self.consume_ready(from_peer, &certificate_id) { - if status.is_delivered() { - self.delivered_certificates.insert(certificate_id); - self.span_tracker.remove(&certificate_id); - self.cert_candidate.remove(&certificate_id); - } - } - - // need to deliver the certificate - } else if self.delivered_certificates.get(&certificate_id).is_none() { - // need to buffer the Ready - self.buffered_messages - .entry(certificate_id) - .or_default() - .push(DoubleEchoCommand::Ready { - from_peer, - certificate_id, - }); - DOUBLE_ECHO_BUFFERED_MESSAGE_COUNT.inc(); - } + pub async fn handle_ready(&mut self, from_peer: PeerId, certificate_id: CertificateId) { + if self.delivered_certificates.get(&certificate_id).is_none() { + let _ = self + .task_manager_message_sender + .send(DoubleEchoCommand::Ready { + from_peer, + certificate_id, + }) + .await; } } - - pub(crate) fn consume_echo( - &mut self, - from_peer: PeerId, - certificate_id: &CertificateId, - ) -> Option<broadcast_state::Status> { - self.cert_candidate - .get_mut(certificate_id) - .and_then(|state| state.apply_echo(from_peer)) - } - - pub(crate) fn consume_ready( - &mut self, - from_peer: PeerId, - certificate_id: &CertificateId, - ) -> Option<broadcast_state::Status> { - self.cert_candidate - .get_mut(certificate_id) - .and_then(|state| state.apply_ready(from_peer)) - } } diff --git a/crates/topos-tce-broadcast/src/lib.rs b/crates/topos-tce-broadcast/src/lib.rs index 43b3881e1..da6c298d1 100644 --- a/crates/topos-tce-broadcast/src/lib.rs +++ b/crates/topos-tce-broadcast/src/lib.rs @@ -23,7 +23,6 @@ use topos_p2p::PeerId; use topos_tce_storage::StorageClient; use tracing::{debug, error, info}; -use crate::sampler::SubscriptionsView; pub use topos_core::uci; pub type Peer = String; @@ -32,11 +31,24 @@ mod constant; pub mod double_echo; pub mod sampler; +#[cfg(feature = "task-manager-channels")] pub mod task_manager_channels; +#[cfg(not(feature = "task-manager-channels"))] pub mod task_manager_futures; + #[cfg(test)] mod tests; +use crate::sampler::SubscriptionsView; + +#[derive(Debug, PartialEq, Eq)] +pub enum TaskStatus { + /// The task finished succesfully and broadcasted the certificate + received ready + Success, + /// The task did not finish succesfully and stopped. + Failure, +} + /// Configuration of TCE implementation pub struct ReliableBroadcastConfig { pub tce_params: ReliableBroadcastParams, @@ -59,7 +71,7 @@ pub enum SamplerCommand { ForceResample, } -#[derive(Debug)] +#[derive(Debug, Clone)] pub enum DoubleEchoCommand { /// Entry point for new certificate to submit as initial sender Broadcast { @@ -95,16 +107,19 @@ impl ReliableBroadcastClient { /// Aggregate is spawned as new task. pub async fn new( config: ReliableBroadcastConfig, - local_peer_id: String, + _local_peer_id: String, storage: StorageClient, ) -> (Self, impl Stream<Item = ProtocolEvents>) { let (subscriptions_view_sender, subscriptions_view_receiver) = - mpsc::channel::<SubscriptionsView>(2048); - let (event_sender, event_receiver) = mpsc::channel(2048); + mpsc::channel::<SubscriptionsView>(*constant::SUBSCRIPTION_VIEW_CHANNEL_SIZE); + let (event_sender, event_receiver) = mpsc::channel(*constant::PROTOCOL_CHANNEL_SIZE); let (command_sender, command_receiver) = mpsc::channel(*constant::COMMAND_CHANNEL_SIZE); let (double_echo_shutdown_channel, double_echo_shutdown_receiver) = mpsc::channel::<oneshot::Sender<()>>(1); + let (task_manager_message_sender, task_manager_message_receiver) = + mpsc::channel(*constant::BROADCAST_TASK_MANAGER_CHANNEL_SIZE); + let pending_certificate_count = storage .get_pending_certificates() .await @@ -113,15 +128,14 @@ impl ReliableBroadcastClient { let double_echo = DoubleEcho::new( config.tce_params, + task_manager_message_sender, command_receiver, - subscriptions_view_receiver, event_sender, double_echo_shutdown_receiver, - local_peer_id, pending_certificate_count, ); - spawn(double_echo.run()); + spawn(double_echo.run(subscriptions_view_receiver, task_manager_message_receiver)); ( Self { @@ -142,6 +156,7 @@ impl ReliableBroadcastClient { network_size: set.len(), }) .await + .map(|_| ()) .map_err(|_| ()) } diff --git a/crates/topos-tce-broadcast/src/task_manager_channels/mod.rs b/crates/topos-tce-broadcast/src/task_manager_channels/mod.rs index f39120dd7..5fcb08bc4 100644 --- a/crates/topos-tce-broadcast/src/task_manager_channels/mod.rs +++ b/crates/topos-tce-broadcast/src/task_manager_channels/mod.rs @@ -1,96 +1,136 @@ use std::collections::HashMap; use tokio::{spawn, sync::mpsc}; +use tce_transport::{ProtocolEvents, ReliableBroadcastParams}; use topos_core::uci::CertificateId; +use tracing::warn; pub mod task; - -use crate::DoubleEchoCommand; -use task::{Task, TaskCompletion, TaskContext}; - -#[derive(Clone)] -pub struct Thresholds { - pub echo: usize, - pub ready: usize, - pub delivery: usize, -} +use crate::double_echo::broadcast_state::BroadcastState; +use crate::sampler::SubscriptionsView; +use crate::TaskStatus; +use crate::{constant, DoubleEchoCommand}; +use task::{Task, TaskContext}; /// The TaskManager is responsible for receiving messages from the network and distributing them /// among tasks. These tasks are either created if none for a certain CertificateID exists yet, /// or existing tasks will receive the messages. pub struct TaskManager { pub message_receiver: mpsc::Receiver<DoubleEchoCommand>, - pub task_completion: mpsc::Receiver<TaskCompletion>, - pub task_context: HashMap<CertificateId, TaskContext>, - pub thresholds: Thresholds, + pub task_completion_receiver: mpsc::Receiver<(CertificateId, TaskStatus)>, + pub task_completion_sender: mpsc::Sender<(CertificateId, TaskStatus)>, + pub notify_task_completion: mpsc::Sender<(CertificateId, TaskStatus)>, + pub subscription_view_receiver: mpsc::Receiver<SubscriptionsView>, + pub subscriptions: SubscriptionsView, + pub event_sender: mpsc::Sender<ProtocolEvents>, + pub tasks: HashMap<CertificateId, TaskContext>, + pub buffered_messages: HashMap<CertificateId, Vec<DoubleEchoCommand>>, + pub thresholds: ReliableBroadcastParams, + pub shutdown_sender: mpsc::Sender<()>, } impl TaskManager { - pub async fn run( - mut self, - task_completion_sender: mpsc::Sender<TaskCompletion>, - event_sender: mpsc::Sender<task::Events>, - ) { + pub fn new( + message_receiver: mpsc::Receiver<DoubleEchoCommand>, + notify_task_completion: mpsc::Sender<(CertificateId, TaskStatus)>, + subscription_view_receiver: mpsc::Receiver<SubscriptionsView>, + event_sender: mpsc::Sender<ProtocolEvents>, + thresholds: ReliableBroadcastParams, + ) -> (Self, mpsc::Receiver<()>) { + let (task_completion_sender, task_completion_receiver) = + mpsc::channel(*constant::BROADCAST_TASK_COMPLETION_CHANNEL_SIZE); + let (shutdown_sender, shutdown_receiver) = mpsc::channel(1); + + ( + Self { + message_receiver, + task_completion_receiver, + task_completion_sender, + notify_task_completion, + subscription_view_receiver, + subscriptions: SubscriptionsView::default(), + event_sender, + tasks: HashMap::new(), + buffered_messages: Default::default(), + thresholds, + shutdown_sender, + }, + shutdown_receiver, + ) + } + + pub async fn run(mut self, mut shutdown_receiver: mpsc::Receiver<()>) { loop { tokio::select! { - // If a task sends a message over the completion channel, it is signalling that it - // is done and can be removed from the open tasks inside `task_context` - Some(task_completion) = self.task_completion.recv() => { - match task_completion.success { - true => { - self.task_context.remove(&task_completion.certificate_id); - } - false => { - self.task_context.remove(&task_completion.certificate_id); - } - } + biased; + + Some(new_subscriptions_view) = self.subscription_view_receiver.recv() => { + self.subscriptions = new_subscriptions_view; } Some(msg) = self.message_receiver.recv() => { match msg { DoubleEchoCommand::Echo { certificate_id, .. } | DoubleEchoCommand::Ready{ certificate_id, .. } => { - let task_context = match self.task_context.get(&certificate_id) { - Some(task_context) => task_context.to_owned(), - None => self.create_and_spawn_new_task(certificate_id, task_completion_sender.clone(), event_sender.clone()), - }; - - Self::send_message_to_task(task_context, msg).await; + if let Some(task_context) = self.tasks.get(&certificate_id) { + _ = task_context.sink.send(msg).await; + } else { + self.buffered_messages.entry(certificate_id).or_default().push(msg); + } } - DoubleEchoCommand::Broadcast { ref cert, .. } => { - if self.task_context.get(&cert.id).is_none() { - let task_context = self.create_and_spawn_new_task(cert.id, task_completion_sender.clone(), event_sender.clone()); - Self::send_message_to_task(task_context, msg).await; + DoubleEchoCommand::Broadcast { ref cert, need_gossip } => { + match self.tasks.entry(cert.id) { + std::collections::hash_map::Entry::Vacant(entry) => { + let broadcast_state = BroadcastState::new( + cert.clone(), + self.thresholds.echo_threshold, + self.thresholds.ready_threshold, + self.thresholds.delivery_threshold, + self.event_sender.clone(), + self.subscriptions.clone(), + need_gossip, + ); + + let (task, task_context) = Task::new(cert.id, self.task_completion_sender.clone(), broadcast_state); + + spawn(task.run()); + + entry.insert(task_context); + } + std::collections::hash_map::Entry::Occupied(_) => {}, } } } } - } - } - } - fn create_and_spawn_new_task( - &mut self, - certificate_id: CertificateId, - task_completion_sender: mpsc::Sender<TaskCompletion>, - event_sender: mpsc::Sender<task::Events>, - ) -> TaskContext { - let (task, context) = Task::new( - certificate_id, - task_completion_sender, - event_sender, - self.thresholds.clone(), - ); + Some((certificate_id, status)) = self.task_completion_receiver.recv() => { + self.tasks.remove(&certificate_id); + let _ = self.notify_task_completion.send((certificate_id, status)).await; + } + + _ = shutdown_receiver.recv() => { + warn!("Task Manager shutting down"); - spawn(task.run()); + for task in self.tasks.iter() { + task.1.shutdown_sender.send(()).await.unwrap(); + } - self.task_context.insert(certificate_id, context.clone()); + break; + } + } - context + for (certificate_id, messages) in &mut self.buffered_messages { + if let Some(task) = self.tasks.get(certificate_id) { + for msg in messages { + _ = task.sink.send(msg.clone()).await; + } + } + } + } } +} - async fn send_message_to_task(task_context: TaskContext, msg: DoubleEchoCommand) { - spawn(async move { - _ = task_context.message_sender.send(msg).await; - }); +impl Drop for TaskManager { + fn drop(&mut self) { + _ = self.shutdown_sender.try_send(()); } } diff --git a/crates/topos-tce-broadcast/src/task_manager_channels/task.rs b/crates/topos-tce-broadcast/src/task_manager_channels/task.rs index 8cec847d5..540042555 100644 --- a/crates/topos-tce-broadcast/src/task_manager_channels/task.rs +++ b/crates/topos-tce-broadcast/src/task_manager_channels/task.rs @@ -1,102 +1,84 @@ use tokio::sync::mpsc; -use topos_core::uci::CertificateId; - -use crate::task_manager_channels::Thresholds; +use crate::double_echo::broadcast_state::{BroadcastState, Status}; use crate::DoubleEchoCommand; +use crate::TaskStatus; +use topos_core::uci::CertificateId; -#[derive(Debug, PartialEq, Eq)] -pub enum Events { - ReachedThresholdOfReady(CertificateId), - ReceivedEcho(CertificateId), - TimeOut(CertificateId), -} - -#[derive(Debug)] -pub struct TaskCompletion { - pub(crate) success: bool, - pub(crate) certificate_id: CertificateId, -} - -impl TaskCompletion { - fn success(certificate_id: CertificateId) -> Self { - TaskCompletion { - success: true, - certificate_id, - } - } - - #[allow(dead_code)] - fn failure(certificate_id: CertificateId) -> Self { - TaskCompletion { - success: false, - certificate_id, - } - } -} - -#[derive(Clone)] +#[derive(Debug, Clone)] pub struct TaskContext { - pub certificate_id: CertificateId, - pub message_sender: mpsc::Sender<DoubleEchoCommand>, + pub sink: mpsc::Sender<DoubleEchoCommand>, + pub shutdown_sender: mpsc::Sender<()>, } pub struct Task { pub message_receiver: mpsc::Receiver<DoubleEchoCommand>, pub certificate_id: CertificateId, - pub completion_sender: mpsc::Sender<TaskCompletion>, - pub event_sender: mpsc::Sender<Events>, - pub thresholds: Thresholds, + pub completion_sender: mpsc::Sender<(CertificateId, TaskStatus)>, + pub broadcast_state: BroadcastState, + pub shutdown_receiver: mpsc::Receiver<()>, } impl Task { pub fn new( certificate_id: CertificateId, - completion_sender: mpsc::Sender<TaskCompletion>, - event_sender: mpsc::Sender<Events>, - thresholds: Thresholds, + completion_sender: mpsc::Sender<(CertificateId, TaskStatus)>, + broadcast_state: BroadcastState, ) -> (Self, TaskContext) { let (message_sender, message_receiver) = mpsc::channel(1024); + let (shutdown_sender, shutdown_receiver) = mpsc::channel(1); + let task_context = TaskContext { - certificate_id, - message_sender, + sink: message_sender, + shutdown_sender, }; let task = Task { message_receiver, certificate_id, completion_sender, - event_sender, - thresholds, + broadcast_state, + shutdown_receiver, }; (task, task_context) } - async fn handle_msg(&mut self, msg: DoubleEchoCommand) -> Result<bool, ()> { - match msg { - DoubleEchoCommand::Echo { certificate_id, .. } => { - let _ = self - .completion_sender - .send(TaskCompletion::success(certificate_id)) - .await; - - let _ = self - .event_sender - .send(Events::ReachedThresholdOfReady(self.certificate_id)) - .await; - - Ok(true) - } - DoubleEchoCommand::Ready { .. } => Ok(true), - DoubleEchoCommand::Broadcast { .. } => Ok(false), - } - } - pub(crate) async fn run(mut self) { loop { tokio::select! { - Some(msg) = self.message_receiver.recv() => if let Ok(true) = self.handle_msg(msg).await { + Some(msg) = self.message_receiver.recv() => { + match msg { + DoubleEchoCommand::Echo { from_peer, .. } => { + if let Some(Status::DeliveredWithReadySent) = + self.broadcast_state.apply_echo(from_peer) + { + let _ = self + .completion_sender + .send((self.certificate_id, TaskStatus::Success)) + .await; + + break; + } + } + DoubleEchoCommand::Ready { from_peer, .. } => { + if let Some(Status::DeliveredWithReadySent) = + self.broadcast_state.apply_ready(from_peer) + { + let _ = self + .completion_sender + .send((self.certificate_id, TaskStatus::Success)) + .await; + + break; + } + } + _ => {} + } + } + + _ = self.shutdown_receiver.recv() => { + println!("Received shutdown, shutting down task {:?}", self.certificate_id); break; } } diff --git a/crates/topos-tce-broadcast/src/task_manager_futures/mod.rs b/crates/topos-tce-broadcast/src/task_manager_futures/mod.rs index 3cf81ed4c..f7376bfa9 100644 --- a/crates/topos-tce-broadcast/src/task_manager_futures/mod.rs +++ b/crates/topos-tce-broadcast/src/task_manager_futures/mod.rs @@ -4,22 +4,18 @@ use futures::StreamExt; use std::collections::HashMap; use std::future::IntoFuture; use std::pin::Pin; +use tce_transport::{ProtocolEvents, ReliableBroadcastParams}; use tokio::sync::mpsc; -use tracing::warn; - use topos_core::uci::CertificateId; +use tracing::warn; pub mod task; +use crate::double_echo::broadcast_state::BroadcastState; +use crate::sampler::SubscriptionsView; use crate::DoubleEchoCommand; -use task::{Task, TaskContext, TaskStatus}; - -#[derive(Clone)] -pub struct Thresholds { - pub echo: usize, - pub ready: usize, - pub delivery: usize, -} +use crate::TaskStatus; +use task::{Task, TaskContext}; /// The TaskManager is responsible for receiving messages from the network and distributing them /// among tasks. These tasks are either created if none for a certain CertificateID exists yet, @@ -27,66 +23,103 @@ pub struct Thresholds { pub struct TaskManager { pub message_receiver: mpsc::Receiver<DoubleEchoCommand>, pub task_completion_sender: mpsc::Sender<(CertificateId, TaskStatus)>, + pub subscription_view_receiver: mpsc::Receiver<SubscriptionsView>, + pub subscriptions: SubscriptionsView, + pub event_sender: mpsc::Sender<ProtocolEvents>, pub tasks: HashMap<CertificateId, TaskContext>, #[allow(clippy::type_complexity)] pub running_tasks: FuturesUnordered< Pin<Box<dyn Future<Output = (CertificateId, TaskStatus)> + Send + 'static>>, >, - pub thresholds: Thresholds, + pub buffered_messages: HashMap<CertificateId, Vec<DoubleEchoCommand>>, + pub thresholds: ReliableBroadcastParams, pub shutdown_sender: mpsc::Sender<()>, } impl TaskManager { + pub fn new( + message_receiver: mpsc::Receiver<DoubleEchoCommand>, + task_completion_sender: mpsc::Sender<(CertificateId, TaskStatus)>, + subscription_view_receiver: mpsc::Receiver<SubscriptionsView>, + event_sender: mpsc::Sender<ProtocolEvents>, + thresholds: ReliableBroadcastParams, + ) -> (Self, mpsc::Receiver<()>) { + let (shutdown_sender, shutdown_receiver) = mpsc::channel(1); + + ( + Self { + message_receiver, + task_completion_sender, + subscription_view_receiver, + subscriptions: SubscriptionsView::default(), + event_sender, + tasks: HashMap::new(), + running_tasks: FuturesUnordered::new(), + buffered_messages: Default::default(), + thresholds, + shutdown_sender, + }, + shutdown_receiver, + ) + } + pub async fn run(mut self, mut shutdown_receiver: mpsc::Receiver<()>) { loop { tokio::select! { - // We receive a new DoubleEchoCommand from the outside through a channel receiver - // The base state is that there is no running task yet - // What we need to do is to - // a) create a new task in the local HashMap: So we can check if incoming messages already have an open task - // b) Add the task to the FuturesUnordered stream: So we can check if the task is done - // The task future has to be started for it to be able to listen on the it's own message receiver. + biased; + + Some(new_subscriptions_view) = self.subscription_view_receiver.recv() => { + self.subscriptions = new_subscriptions_view; + } + Some(msg) = self.message_receiver.recv() => { match msg { - DoubleEchoCommand::Echo { certificate_id, .. } | DoubleEchoCommand::Ready { certificate_id, ..} => { - let task = match self.tasks.entry(certificate_id) { - std::collections::hash_map::Entry::Vacant(entry) => { - let (task, task_context) = Task::new(certificate_id, self.thresholds.clone()); - self.running_tasks.push(task.into_future()); - - entry.insert(task_context) - } - std::collections::hash_map::Entry::Occupied(entry) => entry.into_mut(), + DoubleEchoCommand::Echo { certificate_id, .. } | DoubleEchoCommand::Ready { certificate_id, .. } => { + if let Some(task_context) = self.tasks.get(&certificate_id) { + _ = task_context.sink.send(msg).await; + } else { + self.buffered_messages + .entry(certificate_id) + .or_default() + .push(msg); }; - - _ = task.sink.send(msg).await; } - DoubleEchoCommand::Broadcast { ref cert, .. } => { - let task = match self.tasks.entry(cert.id) { + DoubleEchoCommand::Broadcast { ref cert, need_gossip } => { + match self.tasks.entry(cert.id) { std::collections::hash_map::Entry::Vacant(entry) => { - let (task, task_context) = Task::new(cert.id, self.thresholds.clone()); + let broadcast_state = BroadcastState::new( + cert.clone(), + self.thresholds.echo_threshold, + self.thresholds.ready_threshold, + self.thresholds.delivery_threshold, + self.event_sender.clone(), + self.subscriptions.clone(), + need_gossip, + ); + + let (task, task_context) = Task::new(cert.id, broadcast_state); self.running_tasks.push(task.into_future()); - entry.insert(task_context) + entry.insert(task_context); } - std::collections::hash_map::Entry::Occupied(entry) => entry.into_mut(), - }; - - _ = task.sink.send(msg).await; + std::collections::hash_map::Entry::Occupied(_) => {}, + } } } } - Some((id, status)) = self.running_tasks.next() => { + + + Some((certificate_id, status)) = self.running_tasks.next() => { if status == TaskStatus::Success { - self.remove_finished_task(id); - let _ = self.task_completion_sender.send((id, status)).await; + self.tasks.remove(&certificate_id); + let _ = self.task_completion_sender.send((certificate_id, status)).await; } } + _ = shutdown_receiver.recv() => { warn!("Task Manager shutting down"); - // Shutting down every open task for task in self.tasks.iter() { task.1.shutdown_sender.send(()).await.unwrap(); } @@ -94,11 +127,15 @@ impl TaskManager { break; } } - } - } - fn remove_finished_task(&mut self, certificate_id: CertificateId) { - self.tasks.remove(&certificate_id); + for (certificate_id, messages) in &mut self.buffered_messages { + if let Some(task) = self.tasks.get_mut(certificate_id) { + for msg in messages { + _ = task.sink.send(msg.clone()).await; + } + } + } + } } } diff --git a/crates/topos-tce-broadcast/src/task_manager_futures/task.rs b/crates/topos-tce-broadcast/src/task_manager_futures/task.rs index 5dc2de5c1..9b01a4ebe 100644 --- a/crates/topos-tce-broadcast/src/task_manager_futures/task.rs +++ b/crates/topos-tce-broadcast/src/task_manager_futures/task.rs @@ -5,38 +5,28 @@ use tokio::sync::mpsc; use topos_core::uci::CertificateId; use tracing::warn; -use crate::task_manager_futures::Thresholds; -use crate::DoubleEchoCommand; - -#[derive(Debug, PartialEq, Eq)] -pub enum Events { - ReachedThresholdOfReady(CertificateId), - ReceivedEcho(CertificateId), - TimeOut(CertificateId), -} - -#[derive(Debug, PartialEq, Eq)] -pub enum TaskStatus { - /// The task finished succesfully and broadcasted the certificate + received ready - Success, - /// The task did not finish succesfully and stopped. - Failure, -} +use crate::double_echo::broadcast_state::{BroadcastState, Status}; +use crate::{DoubleEchoCommand, TaskStatus}; +#[derive(Debug)] pub struct TaskContext { pub sink: mpsc::Sender<DoubleEchoCommand>, pub shutdown_sender: mpsc::Sender<()>, } +#[derive(Debug)] pub struct Task { pub message_receiver: mpsc::Receiver<DoubleEchoCommand>, pub certificate_id: CertificateId, - pub thresholds: Thresholds, + pub broadcast_state: BroadcastState, pub shutdown_receiver: mpsc::Receiver<()>, } impl Task { - pub fn new(certificate_id: CertificateId, thresholds: Thresholds) -> (Task, TaskContext) { + pub fn new( + certificate_id: CertificateId, + broadcast_state: BroadcastState, + ) -> (Task, TaskContext) { let (message_sender, message_receiver) = mpsc::channel(10_024); let (shutdown_sender, shutdown_receiver) = mpsc::channel(1); @@ -48,7 +38,7 @@ impl Task { let task = Task { message_receiver, certificate_id, - thresholds, + broadcast_state, shutdown_receiver, }; @@ -67,16 +57,17 @@ impl IntoFuture for Task { tokio::select! { Some(msg) = self.message_receiver.recv() => { match msg { - DoubleEchoCommand::Echo { certificate_id, .. } => { - return (certificate_id, TaskStatus::Success); - } - DoubleEchoCommand::Ready { certificate_id, .. } => { - return (certificate_id, TaskStatus::Success); + DoubleEchoCommand::Echo { from_peer, .. } => { + if let Some(Status::DeliveredWithReadySent) = self.broadcast_state.apply_echo(from_peer) { + return (self.certificate_id, TaskStatus::Success); + } } - DoubleEchoCommand::Broadcast { cert, .. } => { - return (cert.id, TaskStatus::Success); + DoubleEchoCommand::Ready { from_peer, .. } => { + if let Some(Status::DeliveredWithReadySent) = self.broadcast_state.apply_ready(from_peer) { + return (self.certificate_id, TaskStatus::Success); + } } - + _ => {} } } _ = self.shutdown_receiver.recv() => { diff --git a/crates/topos-tce-broadcast/src/tests/mod.rs b/crates/topos-tce-broadcast/src/tests/mod.rs index 308b99d3d..80d8a4dbf 100644 --- a/crates/topos-tce-broadcast/src/tests/mod.rs +++ b/crates/topos-tce-broadcast/src/tests/mod.rs @@ -1,6 +1,3 @@ -mod task_manager_channels; -mod task_manager_futures; - use crate::double_echo::*; use crate::*; use rstest::*; @@ -9,12 +6,10 @@ use std::usize; use tce_transport::ReliableBroadcastParams; use tokio::sync::mpsc::Receiver; -use tokio::time::Duration; use topos_test_sdk::constants::*; const CHANNEL_SIZE: usize = 10; -const WAIT_EVENT_TIMEOUT: Duration = Duration::from_secs(1); #[fixture] fn small_config() -> TceParams { @@ -48,26 +43,23 @@ struct TceParams { struct Context { event_receiver: Receiver<ProtocolEvents>, - subscriptions_view_sender: Sender<SubscriptionsView>, - cmd_sender: Sender<DoubleEchoCommand>, - double_echo_shutdown_sender: Sender<oneshot::Sender<()>>, } -fn create_context(params: TceParams) -> (DoubleEcho, Context) { +async fn create_context(params: TceParams) -> (DoubleEcho, Context) { let (subscriptions_view_sender, subscriptions_view_receiver) = mpsc::channel(CHANNEL_SIZE); - let (cmd_sender, cmd_receiver) = mpsc::channel(CHANNEL_SIZE); + let (_cmd_sender, cmd_receiver) = mpsc::channel(CHANNEL_SIZE); let (event_sender, event_receiver) = mpsc::channel(CHANNEL_SIZE); - let (double_echo_shutdown_sender, double_echo_shutdown_receiver) = + let (_double_echo_shutdown_sender, double_echo_shutdown_receiver) = mpsc::channel::<oneshot::Sender<()>>(1); + let (task_manager_message_sender, task_manager_message_receiver) = mpsc::channel(CHANNEL_SIZE); let mut double_echo = DoubleEcho::new( params.broadcast_params, + task_manager_message_sender.clone(), cmd_receiver, - subscriptions_view_receiver, event_sender, double_echo_shutdown_receiver, - String::new(), 0, ); @@ -85,19 +77,20 @@ fn create_context(params: TceParams) -> (DoubleEcho, Context) { double_echo.subscriptions.ready = peers.clone(); double_echo.subscriptions.network_size = params.nb_peers; - ( - double_echo, - Context { - event_receiver, - // subscribers_update_sender, - subscriptions_view_sender, - cmd_sender, - double_echo_shutdown_sender, - }, - ) + let msg = SubscriptionsView { + echo: peers.clone(), + ready: peers.clone(), + network_size: params.nb_peers, + }; + + subscriptions_view_sender.send(msg).await.unwrap(); + + double_echo.spawn_task_manager(subscriptions_view_receiver, task_manager_message_receiver); + + (double_echo, Context { event_receiver }) } -fn reach_echo_threshold(double_echo: &mut DoubleEcho, cert: &Certificate) { +async fn reach_echo_threshold(double_echo: &mut DoubleEcho, cert: &Certificate) { let selected = double_echo .subscriptions .echo @@ -107,11 +100,11 @@ fn reach_echo_threshold(double_echo: &mut DoubleEcho, cert: &Certificate) { .collect::<Vec<_>>(); for p in selected { - double_echo.consume_echo(p, &cert.id); + double_echo.handle_echo(p, cert.id).await; } } -fn reach_ready_threshold(double_echo: &mut DoubleEcho, cert: &Certificate) { +async fn reach_ready_threshold(double_echo: &mut DoubleEcho, cert: &Certificate) { let selected = double_echo .subscriptions .ready @@ -121,11 +114,11 @@ fn reach_ready_threshold(double_echo: &mut DoubleEcho, cert: &Certificate) { .collect::<Vec<_>>(); for p in selected { - double_echo.consume_ready(p, &cert.id); + double_echo.handle_ready(p, cert.id).await; } } -fn reach_delivery_threshold(double_echo: &mut DoubleEcho, cert: &Certificate) { +async fn reach_delivery_threshold(double_echo: &mut DoubleEcho, cert: &Certificate) { let selected = double_echo .subscriptions .ready @@ -135,7 +128,7 @@ fn reach_delivery_threshold(double_echo: &mut DoubleEcho, cert: &Certificate) { .collect::<Vec<_>>(); for p in selected { - double_echo.consume_ready(p, &cert.id); + double_echo.handle_ready(p, cert.id).await; } } @@ -145,7 +138,7 @@ fn reach_delivery_threshold(double_echo: &mut DoubleEcho, cert: &Certificate) { #[tokio::test] #[trace] async fn trigger_success_path_upon_reaching_threshold(#[case] params: TceParams) { - let (mut double_echo, mut ctx) = create_context(params); + let (mut double_echo, mut ctx) = create_context(params).await; let dummy_cert = Certificate::new( PREV_CERTIFICATE_ID, @@ -159,11 +152,11 @@ async fn trigger_success_path_upon_reaching_threshold(#[case] params: TceParams) .expect("Dummy certificate"); // Trigger Echo upon dispatching - double_echo.broadcast(dummy_cert.clone(), true); + double_echo.broadcast(dummy_cert.clone(), true).await; assert!(matches!( - ctx.event_receiver.try_recv(), - Ok(ProtocolEvents::Broadcast { certificate_id }) if certificate_id == dummy_cert.id + ctx.event_receiver.recv().await, + Some(ProtocolEvents::Broadcast { certificate_id }) if certificate_id == dummy_cert.id )); assert!(matches!( @@ -181,19 +174,19 @@ async fn trigger_success_path_upon_reaching_threshold(#[case] params: TceParams) )); // Trigger Ready upon reaching the Echo threshold - reach_echo_threshold(&mut double_echo, &dummy_cert); + reach_echo_threshold(&mut double_echo, &dummy_cert).await; assert!(matches!( - ctx.event_receiver.try_recv(), - Ok(ProtocolEvents::Ready { .. }) + ctx.event_receiver.recv().await, + Some(ProtocolEvents::Ready { .. }) )); // Trigger Delivery upon reaching the Delivery threshold - reach_delivery_threshold(&mut double_echo, &dummy_cert); + reach_delivery_threshold(&mut double_echo, &dummy_cert).await; assert!(matches!( - ctx.event_receiver.try_recv(), - Ok(ProtocolEvents::CertificateDelivered { certificate }) if certificate == dummy_cert + ctx.event_receiver.recv().await, + Some(ProtocolEvents::CertificateDelivered { certificate }) if certificate == dummy_cert )); } @@ -203,7 +196,7 @@ async fn trigger_success_path_upon_reaching_threshold(#[case] params: TceParams) #[tokio::test] #[trace] async fn trigger_ready_when_reached_enough_ready(#[case] params: TceParams) { - let (mut double_echo, mut ctx) = create_context(params); + let (mut double_echo, mut ctx) = create_context(params).await; let dummy_cert = Certificate::new( PREV_CERTIFICATE_ID, @@ -217,11 +210,11 @@ async fn trigger_ready_when_reached_enough_ready(#[case] params: TceParams) { .expect("Dummy certificate"); // Trigger Echo upon dispatching - double_echo.broadcast(dummy_cert.clone(), true); + double_echo.broadcast(dummy_cert.clone(), true).await; assert!(matches!( - ctx.event_receiver.try_recv(), - Ok(ProtocolEvents::Broadcast { certificate_id }) if certificate_id == dummy_cert.id + ctx.event_receiver.recv().await, + Some(ProtocolEvents::Broadcast { certificate_id }) if certificate_id == dummy_cert.id )); assert!(matches!( @@ -235,61 +228,10 @@ async fn trigger_ready_when_reached_enough_ready(#[case] params: TceParams) { )); // Trigger Ready upon reaching the Ready threshold - reach_ready_threshold(&mut double_echo, &dummy_cert); + reach_ready_threshold(&mut double_echo, &dummy_cert).await; assert!(matches!( - ctx.event_receiver.try_recv(), - Ok(ProtocolEvents::Ready { .. }) + ctx.event_receiver.recv().await, + Some(ProtocolEvents::Ready { .. }) )); } - -#[rstest] -#[case::small_config(small_config())] -#[tokio::test] -async fn buffering_certificate(#[case] params: TceParams) { - let (double_echo, mut ctx) = create_context(params); - - let subscriptions = double_echo.subscriptions.clone(); - - spawn(double_echo.run()); - - // Wait to receive subscribers - tokio::time::sleep(WAIT_EVENT_TIMEOUT).await; - - let le_cert = Certificate::default(); - ctx.cmd_sender - .send(DoubleEchoCommand::Broadcast { - need_gossip: true, - cert: le_cert.clone(), - }) - .await - .expect("Cannot send broadcast command"); - - ctx.subscriptions_view_sender - .send(subscriptions.clone()) - .await - .expect("Cannot send expected view"); - - let mut received_gossip_commands: Vec<Certificate> = Vec::new(); - let assertion = async { - while let Some(event) = ctx.event_receiver.recv().await { - if let ProtocolEvents::Gossip { cert, .. } = event { - received_gossip_commands.push(cert); - } - } - }; - - let _ = tokio::time::timeout(Duration::from_secs(1), assertion).await; - - assert_eq!(received_gossip_commands.len(), 1); - assert_eq!(received_gossip_commands[0].id, le_cert.id); - - // Test shutdown - info!("Waiting for double echo to shutdown..."); - let (sender, receiver) = oneshot::channel(); - ctx.double_echo_shutdown_sender - .send(sender) - .await - .expect("Valid shutdown signal sending"); - assert_eq!(receiver.await, Ok(())); -} diff --git a/crates/topos-tce-broadcast/src/tests/task_manager_channels.rs b/crates/topos-tce-broadcast/src/tests/task_manager_channels.rs deleted file mode 100644 index 2b5339cbd..000000000 --- a/crates/topos-tce-broadcast/src/tests/task_manager_channels.rs +++ /dev/null @@ -1,63 +0,0 @@ -use crate::task_manager_channels::{TaskManager, Thresholds}; - -use crate::*; -use rand::Rng; -use rstest::*; -use std::collections::HashMap; -use tokio::{spawn, sync::mpsc}; -use topos_p2p::PeerId; - -#[rstest] -#[tokio::test] -async fn task_manager_channels_receiving_messages() { - let n = 5; - - let (message_sender, message_receiver) = mpsc::channel(1024); - let (task_completion_sender, task_completion_receiver) = mpsc::channel(1024); - let (event_sender, mut event_receiver) = mpsc::channel(1024); - - let task_manager = TaskManager { - message_receiver, - task_completion: task_completion_receiver, - task_context: HashMap::new(), - thresholds: Thresholds { - echo: n, - ready: n, - delivery: n, - }, - }; - - spawn(task_manager.run(task_completion_sender, event_sender)); - - let mut certificates = vec![]; - - let mut rng = rand::thread_rng(); - - for _ in 0..10 { - let mut id = [0u8; 32]; - rng.fill(&mut id); - let cert_id = CertificateId::from_array(id); - certificates.push(cert_id); - } - - for certificate_id in certificates { - for _ in 0..n { - let echo = DoubleEchoCommand::Echo { - from_peer: PeerId::random(), - certificate_id, - }; - - message_sender.send(echo).await.unwrap(); - } - } - - let mut count = 0; - - while (event_receiver.recv().await).is_some() { - count += 1; - - if count == n { - break; - } - } -} diff --git a/crates/topos-tce-broadcast/src/tests/task_manager_futures.rs b/crates/topos-tce-broadcast/src/tests/task_manager_futures.rs deleted file mode 100644 index 0dde484e9..000000000 --- a/crates/topos-tce-broadcast/src/tests/task_manager_futures.rs +++ /dev/null @@ -1,64 +0,0 @@ -use crate::task_manager_futures::{TaskManager, Thresholds}; -use crate::{CertificateId, DoubleEchoCommand}; -use futures::stream::FuturesUnordered; -use rand::Rng; -use rstest::*; -use tokio::{spawn, sync::mpsc}; -use topos_p2p::PeerId; - -#[rstest] -#[tokio::test] -async fn task_manager_futures_receiving_messages() { - let n = 5; - - let (message_sender, message_receiver) = mpsc::channel(1024); - let (task_completion_sender, mut task_completion_receiver) = mpsc::channel(1024); - let (shutdown_sender, shutdown_receiver) = mpsc::channel(1); - - let task_manager = TaskManager { - message_receiver, - task_completion_sender, - tasks: Default::default(), - running_tasks: FuturesUnordered::new(), - thresholds: Thresholds { - echo: n, - ready: n, - delivery: n, - }, - shutdown_sender, - }; - - spawn(task_manager.run(shutdown_receiver)); - - let mut certificates = vec![]; - - let mut rng = rand::thread_rng(); - - for _ in 0..10 { - let mut id = [0u8; 32]; - rng.fill(&mut id); - let cert_id = CertificateId::from_array(id); - certificates.push(cert_id); - } - - for certificate_id in certificates { - for _ in 0..n { - let echo = DoubleEchoCommand::Echo { - from_peer: PeerId::random(), - certificate_id, - }; - - message_sender.send(echo).await.unwrap(); - } - } - - let mut count = 0; - - while let Some((_, _)) = task_completion_receiver.recv().await { - count += 1; - - if count == n { - break; - } - } -}