Skip to content
This repository has been archived by the owner on Oct 31, 2024. It is now read-only.

chore: bootstrapping task-manager through feature flags #266

Merged
merged 24 commits into from
Jul 26, 2023
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
Show all changes
24 commits
Select commit Hold shift + click to select a range
1373de3
chore: adding task-manager feature flags
gruberb Jul 21, 2023
25b4983
fix: remove reduntant feature flag
gruberb Jul 21, 2023
5ee8db3
fix: make the taskmanager new function coherent
gruberb Jul 21, 2023
4fd6ff0
fix: cleanup task_managers and tasks
gruberb Jul 21, 2023
7fec500
chore: intatiate and start the task_manager
gruberb Jul 21, 2023
53986bf
Merge branch 'main' into feat/TP-644
gruberb Jul 21, 2023
cdda231
chore: add echo and ready workflow, wip
gruberb Jul 24, 2023
71c33b2
chore: move handle_x functions to taskmanager, wip
gruberb Jul 24, 2023
07f8678
chore: move event_sender to taskmanager
gruberb Jul 24, 2023
5d7ad60
chore: add workflow to futures task manager, wip
gruberb Jul 24, 2023
23f854f
fix: adjust task returns
gruberb Jul 24, 2023
e370f3a
fix: tests, wip
gruberb Jul 24, 2023
88550ea
fix: test errors, wip
gruberb Jul 24, 2023
6668fab
fix: debugging tests
gruberb Jul 25, 2023
a1d9b74
fix: add hint to restructure the broadcast, wip
gruberb Jul 25, 2023
757b3c1
chore: refactor task_manager tests
Freyskeyd Jul 25, 2023
4a8bbb7
fix: pr review, cleaning up tests and clippy messages
gruberb Jul 25, 2023
da16ac7
chore: add task-manager-channels to double-echo
gruberb Jul 25, 2023
d471220
fix: add constants for channel sizes
gruberb Jul 25, 2023
bfffea1
fix: add doc comments for double echo
gruberb Jul 25, 2023
e2c0b0e
chore: adapt task-manager benches to faeture flag
gruberb Jul 26, 2023
2b31655
fix: fix cargo xclippy
gruberb Jul 26, 2023
cc829c3
fix: cert-delivery test, forward subscription view
gruberb Jul 26, 2023
55b0530
fix: pr review
gruberb Jul 26, 2023
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
24 changes: 4 additions & 20 deletions crates/topos-tce-broadcast/benches/double_echo.rs
Original file line number Diff line number Diff line change
@@ -1,33 +1,17 @@
use criterion::async_executor::FuturesExecutor;
use criterion::{criterion_group, criterion_main, Criterion};

#[cfg(feature = "task-manager-channels")]
mod task_manager_channels;
#[cfg(not(feature = "task-manager-channels"))]
mod task_manager_futures;
mod task_manager;

pub fn criterion_benchmark(c: &mut Criterion) {
let echo_messages = 10;
let certificates = 10_000;

let runtime = tokio::runtime::Builder::new_current_thread()
.build()
.unwrap();

#[cfg(feature = "task-manager-channels")]
c.bench_function("double_echo with channels", |b| {
b.to_async(FuturesExecutor).iter(|| async {
runtime.block_on(async {
task_manager_channels::processing_double_echo(echo_messages).await
})
})
});

#[cfg(not(feature = "task-manager-channels"))]
c.bench_function("double_echo with futures", |b| {
c.bench_function("double_echo", |b| {
b.to_async(FuturesExecutor).iter(|| async {
runtime.block_on(async {
task_manager_futures::processing_double_echo(echo_messages).await
})
runtime.block_on(async { task_manager::processing_double_echo(certificates).await })
})
});
}
Expand Down
121 changes: 121 additions & 0 deletions crates/topos-tce-broadcast/benches/task_manager.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,121 @@
use std::collections::HashSet;
use tce_transport::{ProtocolEvents, ReliableBroadcastParams};
use tokio::sync::mpsc::Receiver;
use tokio::sync::{mpsc, oneshot};
use topos_tce_broadcast::double_echo::DoubleEcho;
use topos_tce_broadcast::sampler::SubscriptionsView;
use topos_test_sdk::certificates::create_certificate_chain;
use topos_test_sdk::constants::{SOURCE_SUBNET_ID_1, TARGET_SUBNET_ID_1};

const CHANNEL_SIZE: usize = 256_000;

struct TceParams {
nb_peers: usize,
broadcast_params: ReliableBroadcastParams,
}

struct Context {
event_receiver: Receiver<ProtocolEvents>,
}

pub async fn processing_double_echo(n: u64) {
let (subscriptions_view_sender, subscriptions_view_receiver) = mpsc::channel(CHANNEL_SIZE);

let (_cmd_sender, cmd_receiver) = mpsc::channel(CHANNEL_SIZE);
let (event_sender, event_receiver) = mpsc::channel(CHANNEL_SIZE);
let (_double_echo_shutdown_sender, double_echo_shutdown_receiver) =
mpsc::channel::<oneshot::Sender<()>>(1);
let (task_manager_message_sender, task_manager_message_receiver) = mpsc::channel(CHANNEL_SIZE);

let params = TceParams {
nb_peers: 10,
broadcast_params: ReliableBroadcastParams {
echo_threshold: 8,
ready_threshold: 5,
delivery_threshold: 8,
},
};

let mut ctx = Context { event_receiver };

let mut double_echo = DoubleEcho::new(
params.broadcast_params,
task_manager_message_sender.clone(),
cmd_receiver,
event_sender,
double_echo_shutdown_receiver,
0,
);

// List of peers
let mut peers = HashSet::new();
for i in 0..params.nb_peers {
let peer = topos_p2p::utils::local_key_pair(Some(i as u8))
.public()
.to_peer_id();
peers.insert(peer);
}

// Subscriptions
double_echo.subscriptions.echo = peers.clone();
double_echo.subscriptions.ready = peers.clone();
double_echo.subscriptions.network_size = params.nb_peers;

let msg = SubscriptionsView {
echo: peers.clone(),
ready: peers.clone(),
network_size: params.nb_peers,
};

subscriptions_view_sender.send(msg).await.unwrap();

double_echo.spawn_task_manager(subscriptions_view_receiver, task_manager_message_receiver);

let certificates =
create_certificate_chain(SOURCE_SUBNET_ID_1, &[TARGET_SUBNET_ID_1], n as usize);

let double_echo_selected_echo = double_echo
.subscriptions
.echo
.iter()
.take(double_echo.params.echo_threshold)
.cloned()
.collect::<Vec<_>>();

let double_echo_selected_ready = double_echo
.subscriptions
.ready
.iter()
.take(double_echo.params.delivery_threshold)
.cloned()
.collect::<Vec<_>>();

for cert in &certificates {
double_echo.broadcast(cert.clone(), true).await;
}

for cert in &certificates {
for p in &double_echo_selected_echo {
double_echo.handle_echo(p.clone(), cert.id).await;
}

for p in &double_echo_selected_ready {
double_echo.handle_ready(p.clone(), cert.id).await;
}
}

let mut count = 0;

while let Some(event) = ctx.event_receiver.recv().await {
match event {
ProtocolEvents::CertificateDelivered { .. } => {
count += 1;

if count == n {
break;
}
}
_ => {}
}
}
}
56 changes: 0 additions & 56 deletions crates/topos-tce-broadcast/benches/task_manager_channels.rs

This file was deleted.

57 changes: 0 additions & 57 deletions crates/topos-tce-broadcast/benches/task_manager_futures.rs

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -98,7 +98,9 @@ impl BroadcastState {
let event = ProtocolEvents::Ready {
certificate_id: self.certificate.id,
};
self.event_sender.try_send(event).unwrap();
if let Err(e) = self.event_sender.try_send(event) {
println!("Error sending Ready message: {}", e);
gruberb marked this conversation as resolved.
Show resolved Hide resolved
}

self.status = self.status.ready_sent();

Expand Down
14 changes: 7 additions & 7 deletions crates/topos-tce-broadcast/src/double_echo/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,11 +20,11 @@ pub struct DoubleEcho {
/// Delivered certificate ids to avoid processing twice the same certificate
delivered_certificates: HashSet<CertificateId>,
/// The threshold parameters for the double echo
pub(crate) params: ReliableBroadcastParams,
pub params: ReliableBroadcastParams,
/// The connection to the TaskManager to forward DoubleEchoCommand messages
task_manager_message_sender: mpsc::Sender<DoubleEchoCommand>,
gruberb marked this conversation as resolved.
Show resolved Hide resolved
/// The overview of the network, which holds echo and ready subscriptions and the network size
pub(crate) subscriptions: SubscriptionsView,
pub subscriptions: SubscriptionsView,
}

impl DoubleEcho {
Expand All @@ -51,7 +51,7 @@ impl DoubleEcho {
}

#[cfg(not(feature = "task-manager-channels"))]
pub(crate) fn spawn_task_manager(
pub fn spawn_task_manager(
&mut self,
subscriptions_view_receiver: mpsc::Receiver<SubscriptionsView>,
task_manager_message_receiver: mpsc::Receiver<DoubleEchoCommand>,
Expand All @@ -72,7 +72,7 @@ impl DoubleEcho {
}

#[cfg(feature = "task-manager-channels")]
pub(crate) fn spawn_task_manager(
pub fn spawn_task_manager(
&mut self,
subscriptions_view_receiver: mpsc::Receiver<SubscriptionsView>,
task_manager_message_receiver: mpsc::Receiver<DoubleEchoCommand>,
Expand Down Expand Up @@ -162,7 +162,7 @@ impl DoubleEcho {
/// Called to process potentially new certificate:
/// - either submitted from API ( [tce_transport::TceCommands::Broadcast] command)
/// - or received through the gossip (first step of protocol exchange)
pub(crate) async fn broadcast(&mut self, cert: Certificate, origin: bool) {
pub async fn broadcast(&mut self, cert: Certificate, origin: bool) {
info!("🙌 Starting broadcasting the Certificate {}", &cert.id);
if self.cert_pre_broadcast_check(&cert).is_err() {
error!("Failure on the pre-check for the Certificate {}", &cert.id);
Expand Down Expand Up @@ -238,7 +238,7 @@ impl DoubleEcho {
}

impl DoubleEcho {
pub(crate) async fn handle_echo(&mut self, from_peer: PeerId, certificate_id: CertificateId) {
pub async fn handle_echo(&mut self, from_peer: PeerId, certificate_id: CertificateId) {
if self.delivered_certificates.get(&certificate_id).is_none() {
let _ = self
.task_manager_message_sender
Expand All @@ -250,7 +250,7 @@ impl DoubleEcho {
}
}

pub(crate) async fn handle_ready(&mut self, from_peer: PeerId, certificate_id: CertificateId) {
pub async fn handle_ready(&mut self, from_peer: PeerId, certificate_id: CertificateId) {
if self.delivered_certificates.get(&certificate_id).is_none() {
let _ = self
.task_manager_message_sender
Expand Down
3 changes: 2 additions & 1 deletion crates/topos-tce-broadcast/src/tests/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -82,7 +82,8 @@ async fn create_context(params: TceParams) -> (DoubleEcho, Context) {
ready: peers.clone(),
network_size: params.nb_peers,
};
let _ = subscriptions_view_sender.send(msg).await.unwrap();

subscriptions_view_sender.send(msg).await.unwrap();

double_echo.spawn_task_manager(subscriptions_view_receiver, task_manager_message_receiver);

Expand Down