Skip to content
This repository has been archived by the owner on Oct 31, 2024. It is now read-only.

feat: adding taskmanager and criterion benches #263

Merged
merged 13 commits into from
Jul 20, 2023
128 changes: 128 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

7 changes: 7 additions & 0 deletions crates/topos-tce-broadcast/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -27,11 +27,18 @@ topos-tce-storage = { path = "../topos-tce-storage/" }
topos-telemetry = { path = "../topos-telemetry/" }

[dev-dependencies]
criterion = { version = "0.5.1", features = ["async_futures", "async_tokio"] }
test-log.workspace = true
env_logger.workspace = true
rstest.workspace = true
rand.workspace = true

topos-test-sdk = { path = "../topos-test-sdk/" }

[features]
default = []

[[bench]]
name = "double_echo"
path = "benches/double_echo.rs"
harness = false
gruberb marked this conversation as resolved.
Show resolved Hide resolved
32 changes: 32 additions & 0 deletions crates/topos-tce-broadcast/benches/double_echo.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
use criterion::async_executor::FuturesExecutor;
use criterion::{criterion_group, criterion_main, Criterion};

mod task_manager_channels;
mod task_manager_futures;

pub fn criterion_benchmark(c: &mut Criterion) {
let echo_messages = 10;

let runtime = tokio::runtime::Builder::new_current_thread()
.build()
.unwrap();

c.bench_function("double_echo with channels", |b| {
b.to_async(FuturesExecutor).iter(|| async {
runtime.block_on(async {
task_manager_channels::processing_double_echo(echo_messages).await
})
})
});

c.bench_function("double_echo with futures", |b| {
b.to_async(FuturesExecutor).iter(|| async {
runtime.block_on(async {
task_manager_futures::processing_double_echo(echo_messages).await
})
})
});
}

criterion_group!(benches, criterion_benchmark);
criterion_main!(benches);
63 changes: 63 additions & 0 deletions crates/topos-tce-broadcast/benches/task_manager_channels.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,63 @@
use std::collections::HashMap;

use rand::Rng;
use tokio::spawn;
use tokio::sync::mpsc;
use tracing::Span;

use topos_core::uci::CertificateId;
use topos_p2p::PeerId;
use topos_tce_broadcast::task_manager_channels::{TaskManager, Thresholds};
use topos_tce_broadcast::DoubleEchoCommand;

pub async fn processing_double_echo(n: u64) {
let (message_sender, message_receiver) = mpsc::channel(1024);
let (task_completion_sender, task_completion_receiver) = mpsc::channel(1024);
let (event_sender, mut event_receiver) = mpsc::channel(1024);

let task_manager = TaskManager {
message_receiver,
task_completion: task_completion_receiver,
task_context: HashMap::new(),
thresholds: Thresholds {
echo: n as usize,
ready: n as usize,
delivery: n as usize,
},
};

spawn(task_manager.run(task_completion_sender, event_sender));

let mut certificates = vec![];

let mut rng = rand::thread_rng();

for _ in 0..10_000 {
let mut id = [0u8; 32];
rng.fill(&mut id);
let cert_id = CertificateId::from_array(id);
certificates.push(cert_id);
}

for certificate_id in certificates {
for _ in 0..n {
let echo = DoubleEchoCommand::Echo {
from_peer: PeerId::random(),
certificate_id,
ctx: Span::current(),
};

message_sender.send(echo).await.unwrap();
}
}

let mut count = 0;

while (event_receiver.recv().await).is_some() {
count += 1;

if count == n {
break;
}
}
}
64 changes: 64 additions & 0 deletions crates/topos-tce-broadcast/benches/task_manager_futures.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,64 @@
use futures::stream::FuturesUnordered;
use rand::Rng;
use tokio::spawn;
use tokio::sync::mpsc;
use topos_core::uci::CertificateId;
use topos_p2p::PeerId;
use topos_tce_broadcast::DoubleEchoCommand;
use tracing::Span;

use topos_tce_broadcast::task_manager_futures::{TaskManager, Thresholds};

pub async fn processing_double_echo(n: u64) {
let (message_sender, message_receiver) = mpsc::channel(1024);
let (task_completion_sender, mut task_completion_receiver) = mpsc::channel(48_000);
let (shutdown_sender, shutdown_receiver) = mpsc::channel(1);

let task_manager = TaskManager {
message_receiver,
task_completion_sender,
tasks: Default::default(),
running_tasks: FuturesUnordered::new(),
thresholds: Thresholds {
echo: n as usize,
ready: n as usize,
delivery: n as usize,
},
shutdown_sender,
};

spawn(task_manager.run(shutdown_receiver));

let mut certificates = vec![];

let mut rng = rand::thread_rng();

for _ in 0..10_000 {
let mut id = [0u8; 32];
rng.fill(&mut id);
let cert_id = CertificateId::from_array(id);
certificates.push(cert_id);
}

for certificate_id in certificates {
for _ in 0..n {
let echo = DoubleEchoCommand::Echo {
from_peer: PeerId::random(),
certificate_id,
ctx: Span::current(),
};

message_sender.send(echo).await.unwrap();
}
}

let mut count = 0;

while let Some((_, _)) = task_completion_receiver.recv().await {
count += 1;

if count == n {
break;
}
}
}
2 changes: 2 additions & 0 deletions crates/topos-tce-broadcast/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,8 @@ mod constant;
pub mod double_echo;
pub mod sampler;

pub mod task_manager_channels;
pub mod task_manager_futures;
#[cfg(test)]
mod tests;

Expand Down
Loading