From 8ce6f3144050afe6a3069739a653f5019c945db1 Mon Sep 17 00:00:00 2001 From: Andrei Eres Date: Tue, 19 Nov 2024 09:54:13 +0100 Subject: [PATCH 01/10] Use iter_batched --- .../network/benches/notifications_protocol.rs | 95 +++++++++++-------- 1 file changed, 55 insertions(+), 40 deletions(-) diff --git a/substrate/client/network/benches/notifications_protocol.rs b/substrate/client/network/benches/notifications_protocol.rs index c1e18c7b7f47..1594e02d0c93 100644 --- a/substrate/client/network/benches/notifications_protocol.rs +++ b/substrate/client/network/benches/notifications_protocol.rs @@ -114,14 +114,16 @@ where (worker, notification_service) } -async fn run_serially(size: usize, limit: usize) -where +async fn run_serially( + listen_address1: sc_network::Multiaddr, + listen_address2: sc_network::Multiaddr, + size: usize, + limit: usize, +) where B: BlockT + 'static, H: ExHashT, N: NetworkBackend, { - let listen_address1 = get_listen_address(); - let listen_address2 = get_listen_address(); let (worker1, mut notification_service1) = create_network_worker::(listen_address1); let (worker2, mut notification_service2) = create_network_worker::(listen_address2.clone()); @@ -299,44 +301,57 @@ fn run_benchmark(c: &mut Criterion) { BenchmarkId::new("libp2p/serially", label), &(size, NOTIFICATIONS), |b, &(size, limit)| { - b.to_async(&rt).iter(|| { - run_serially::>(size, limit) - }); - }, - ); - group.bench_with_input( - BenchmarkId::new("litep2p/serially", label), - &(size, NOTIFICATIONS), - |b, &(size, limit)| { - b.to_async(&rt).iter(|| { - run_serially::( - size, limit, - ) - }); - }, - ); - group.bench_with_input( - BenchmarkId::new("libp2p/with_backpressure", label), - &(size, NOTIFICATIONS), - |b, &(size, limit)| { - b.to_async(&rt).iter(|| { - run_with_backpressure::>( - size, limit, - ) - }); - }, - ); - group.bench_with_input( - BenchmarkId::new("litep2p/with_backpressure", label), - &(size, NOTIFICATIONS), - |b, &(size, limit)| { - b.to_async(&rt).iter(|| { - run_with_backpressure::( - size, limit, - ) - }); + b.to_async(&rt).iter_batched( + || { + let listen_address1 = get_listen_address(); + let listen_address2 = get_listen_address(); + (listen_address1, listen_address2) + }, + |(listen_address1, listen_address2)| { + run_serially::>( + listen_address1, + listen_address2, + size, + limit, + ) + }, + criterion::BatchSize::SmallInput, + ); }, ); + // group.bench_with_input( + // BenchmarkId::new("litep2p/serially", label), + // &(size, NOTIFICATIONS), + // |b, &(size, limit)| { + // b.to_async(&rt).iter(|| { + // run_serially::( + // size, limit, + // ) + // }); + // }, + // ); + // group.bench_with_input( + // BenchmarkId::new("libp2p/with_backpressure", label), + // &(size, NOTIFICATIONS), + // |b, &(size, limit)| { + // b.to_async(&rt).iter(|| { + // run_with_backpressure::>( + // size, limit, + // ) + // }); + // }, + // ); + // group.bench_with_input( + // BenchmarkId::new("litep2p/with_backpressure", label), + // &(size, NOTIFICATIONS), + // |b, &(size, limit)| { + // b.to_async(&rt).iter(|| { + // run_with_backpressure::( + // size, limit, + // ) + // }); + // }, + // ); } } From d8a7bc1a5417bf92f008a26da7c58023a08a5948 Mon Sep 17 00:00:00 2001 From: Andrei Eres Date: Wed, 20 Nov 2024 10:20:53 +0100 Subject: [PATCH 02/10] Setup workers --- .../network/benches/notifications_protocol.rs | 307 ++++++++---------- 1 file changed, 129 insertions(+), 178 deletions(-) diff --git a/substrate/client/network/benches/notifications_protocol.rs b/substrate/client/network/benches/notifications_protocol.rs index 1594e02d0c93..89091f265903 100644 --- a/substrate/client/network/benches/notifications_protocol.rs +++ b/substrate/client/network/benches/notifications_protocol.rs @@ -16,6 +16,7 @@ // You should have received a copy of the GNU General Public License // along with this program. If not, see . +use assert_matches::assert_matches; use criterion::{ criterion_group, criterion_main, AxisScale, BenchmarkId, Criterion, PlotConfiguration, Throughput, @@ -27,7 +28,7 @@ use sc_network::{ }, service::traits::NotificationEvent, Litep2pNetworkBackend, NetworkBackend, NetworkWorker, NotificationMetrics, NotificationService, - Roles, + PeerId, Roles, }; use sc_network_common::{sync::message::BlockAnnouncesHandshake, ExHashT}; use sc_network_types::build_multiaddr; @@ -114,16 +115,14 @@ where (worker, notification_service) } -async fn run_serially( - listen_address1: sc_network::Multiaddr, - listen_address2: sc_network::Multiaddr, - size: usize, - limit: usize, -) where +fn setup_workers() -> (Box, Box, PeerId) +where B: BlockT + 'static, H: ExHashT, N: NetworkBackend, { + let listen_address1 = get_listen_address(); + let listen_address2 = get_listen_address(); let (worker1, mut notification_service1) = create_network_worker::(listen_address1); let (worker2, mut notification_service2) = create_network_worker::(listen_address2.clone()); @@ -134,74 +133,72 @@ async fn run_serially( .add_reserved_peer(MultiaddrWithPeerId { multiaddr: listen_address2, peer_id: peer_id2 }) .unwrap(); - let network1_run = worker1.run(); - let network2_run = worker2.run(); - let (tx, rx) = async_channel::bounded(10); - - let network1 = tokio::spawn(async move { - let mut sent_counter = 0; - tokio::pin!(network1_run); + let mut notification_service_cloned1 = notification_service1.clone().unwrap(); + let mut notification_service_cloned2 = notification_service2.clone().unwrap(); + tokio::spawn(worker1.run()); + tokio::spawn(worker2.run()); + let ready = tokio::spawn(async move { loop { tokio::select! { - _ = &mut network1_run => {}, - event = notification_service1.next_event() => { - match event { - Some(NotificationEvent::NotificationStreamOpened { .. }) => { - sent_counter += 1; - notification_service1 - .send_async_notification(&peer_id2, vec![0; size]) - .await - .unwrap(); - }, - Some(NotificationEvent::NotificationStreamClosed { .. }) => { - if sent_counter >= limit { - break; - } - panic!("Unexpected stream closure {:?}", event); - } - event => panic!("Unexpected event {:?}", event), - }; - }, - message = rx.recv() => { - match message { - Ok(Some(_)) => { - sent_counter += 1; - notification_service1 - .send_async_notification(&peer_id2, vec![0; size]) - .await - .unwrap(); - }, - Ok(None) => break, - Err(err) => panic!("Unexpected error {:?}", err), - + Some(event) = notification_service_cloned1.next_event() => { + if let NotificationEvent::NotificationStreamOpened { .. } = event { + break; } - } + }, + Some(_event) = notification_service_cloned2.next_event() => {}, } } }); + + tokio::task::block_in_place(|| { + let _ = tokio::runtime::Handle::current().block_on(ready); + }); + + (notification_service1, notification_service2, peer_id2) +} + +async fn run_serially( + (mut notification_service1, mut notification_service2, peer_id2): ( + Box, + Box, + PeerId, + ), + size: usize, + limit: usize, +) { + let (tx, rx) = async_channel::bounded(1); + let ready_tx = tx.clone(); + let network1 = tokio::spawn(async move { + while let Some(event) = notification_service1.next_event().await { + if let NotificationEvent::NotificationStreamOpened { .. } = event { + let _ = ready_tx.send(Some(())).await; + break; + }; + } + while let Ok(message) = rx.recv().await { + let Some(_) = message else { break }; + notification_service1 + .send_async_notification(&peer_id2, vec![0; size]) + .await + .unwrap(); + } + }); let network2 = tokio::spawn(async move { let mut received_counter = 0; - tokio::pin!(network2_run); - loop { - tokio::select! { - _ = &mut network2_run => {}, - event = notification_service2.next_event() => { - match event { - Some(NotificationEvent::ValidateInboundSubstream { result_tx, .. }) => { - result_tx.send(sc_network::service::traits::ValidationResult::Accept).unwrap(); - }, - Some(NotificationEvent::NotificationStreamOpened { .. }) => {}, - Some(NotificationEvent::NotificationReceived { .. }) => { - received_counter += 1; - if received_counter >= limit { - let _ = tx.send(None).await; - break - } - let _ = tx.send(Some(())).await; - }, - event => panic!("Unexpected event {:?}", event), - }; - }, + while let Some(event) = notification_service2.next_event().await { + assert_matches!( + event, + NotificationEvent::ValidateInboundSubstream { .. } | + NotificationEvent::NotificationStreamOpened { .. } | + NotificationEvent::NotificationReceived { .. } + ); + if let NotificationEvent::NotificationReceived { .. } = event { + received_counter += 1; + if received_counter >= limit { + let _ = tx.send(None).await; + break; + } + let _ = tx.send(Some(())).await; } } }); @@ -209,77 +206,42 @@ async fn run_serially( let _ = tokio::join!(network1, network2); } -async fn run_with_backpressure(size: usize, limit: usize) -where - B: BlockT + 'static, - H: ExHashT, - N: NetworkBackend, -{ - let listen_address1 = get_listen_address(); - let listen_address2 = get_listen_address(); - let (worker1, mut notification_service1) = create_network_worker::(listen_address1); - let (worker2, mut notification_service2) = - create_network_worker::(listen_address2.clone()); - let peer_id2: sc_network::PeerId = worker2.network_service().local_peer_id().into(); - - worker1 - .network_service() - .add_reserved_peer(MultiaddrWithPeerId { multiaddr: listen_address2, peer_id: peer_id2 }) - .unwrap(); - - let network1_run = worker1.run(); - let network2_run = worker2.run(); - +async fn run_with_backpressure( + (mut notification_service1, mut notification_service2, peer_id2): ( + Box, + Box, + PeerId, + ), + size: usize, + limit: usize, +) { let network1 = tokio::spawn(async move { - let mut sent_counter = 0; - tokio::pin!(network1_run); - loop { - tokio::select! { - _ = &mut network1_run => {}, - event = notification_service1.next_event() => { - match event { - Some(NotificationEvent::NotificationStreamOpened { .. }) => { - while sent_counter < limit { - sent_counter += 1; - notification_service1 - .send_async_notification(&peer_id2, vec![0; size]) - .await - .unwrap(); - } - }, - Some(NotificationEvent::NotificationStreamClosed { .. }) => { - if sent_counter != limit { panic!("Stream closed unexpectedly") } - break - }, - event => panic!("Unexpected event {:?}", event), - }; - }, + while let Some(event) = notification_service1.next_event().await { + if let NotificationEvent::NotificationStreamOpened { .. } = event { + for _ in 0..limit { + notification_service1 + .send_async_notification(&peer_id2, vec![0; size]) + .await + .unwrap(); + } + break; } } }); let network2 = tokio::spawn(async move { let mut received_counter = 0; - tokio::pin!(network2_run); - loop { - tokio::select! { - _ = &mut network2_run => {}, - event = notification_service2.next_event() => { - match event { - Some(NotificationEvent::ValidateInboundSubstream { result_tx, .. }) => { - result_tx.send(sc_network::service::traits::ValidationResult::Accept).unwrap(); - }, - Some(NotificationEvent::NotificationStreamOpened { .. }) => {}, - Some(NotificationEvent::NotificationStreamClosed { .. }) => { - if received_counter != limit { panic!("Stream closed unexpectedly") } - break - }, - Some(NotificationEvent::NotificationReceived { .. }) => { - received_counter += 1; - if received_counter >= limit { break } - }, - event => panic!("Unexpected event {:?}", event), - }; - }, + while let Some(event) = notification_service2.next_event().await { + assert_matches!( + event, + NotificationEvent::ValidateInboundSubstream { .. } | + NotificationEvent::NotificationStreamOpened { .. } | + NotificationEvent::NotificationReceived { .. } + ); + if let NotificationEvent::NotificationReceived { .. } = event { + received_counter += 1; + if received_counter >= limit { + break + } } } }); @@ -302,56 +264,45 @@ fn run_benchmark(c: &mut Criterion) { &(size, NOTIFICATIONS), |b, &(size, limit)| { b.to_async(&rt).iter_batched( - || { - let listen_address1 = get_listen_address(); - let listen_address2 = get_listen_address(); - (listen_address1, listen_address2) - }, - |(listen_address1, listen_address2)| { - run_serially::>( - listen_address1, - listen_address2, - size, - limit, - ) - }, + setup_workers::>, + |setup| run_serially(setup, size, limit), + criterion::BatchSize::SmallInput, + ); + }, + ); + group.bench_with_input( + BenchmarkId::new("litep2p/serially", label), + &(size, NOTIFICATIONS), + |b, &(size, limit)| { + b.to_async(&rt).iter_batched( + setup_workers::, + |setup| run_serially(setup, size, limit), + criterion::BatchSize::SmallInput, + ); + }, + ); + group.bench_with_input( + BenchmarkId::new("libp2p/with_backpressure", label), + &(size, NOTIFICATIONS), + |b, &(size, limit)| { + b.to_async(&rt).iter_batched( + setup_workers::>, + |setup| run_with_backpressure(setup, size, limit), + criterion::BatchSize::SmallInput, + ); + }, + ); + group.bench_with_input( + BenchmarkId::new("litep2p/with_backpressure", label), + &(size, NOTIFICATIONS), + |b, &(size, limit)| { + b.to_async(&rt).iter_batched( + setup_workers::, + |setup| run_with_backpressure(setup, size, limit), criterion::BatchSize::SmallInput, ); }, ); - // group.bench_with_input( - // BenchmarkId::new("litep2p/serially", label), - // &(size, NOTIFICATIONS), - // |b, &(size, limit)| { - // b.to_async(&rt).iter(|| { - // run_serially::( - // size, limit, - // ) - // }); - // }, - // ); - // group.bench_with_input( - // BenchmarkId::new("libp2p/with_backpressure", label), - // &(size, NOTIFICATIONS), - // |b, &(size, limit)| { - // b.to_async(&rt).iter(|| { - // run_with_backpressure::>( - // size, limit, - // ) - // }); - // }, - // ); - // group.bench_with_input( - // BenchmarkId::new("litep2p/with_backpressure", label), - // &(size, NOTIFICATIONS), - // |b, &(size, limit)| { - // b.to_async(&rt).iter(|| { - // run_with_backpressure::( - // size, limit, - // ) - // }); - // }, - // ); } } From 77b1e09e7871d89f2048889ee884e9abd5d5cf48 Mon Sep 17 00:00:00 2001 From: Andrei Eres Date: Fri, 22 Nov 2024 09:32:32 +0100 Subject: [PATCH 03/10] Move to iter_batched_ref --- .../network/benches/notifications_protocol.rs | 188 +++++++++--------- 1 file changed, 95 insertions(+), 93 deletions(-) diff --git a/substrate/client/network/benches/notifications_protocol.rs b/substrate/client/network/benches/notifications_protocol.rs index 89091f265903..99307a7cfb72 100644 --- a/substrate/client/network/benches/notifications_protocol.rs +++ b/substrate/client/network/benches/notifications_protocol.rs @@ -37,8 +37,10 @@ use sp_runtime::traits::{Block as BlockT, Zero}; use std::{ net::{IpAddr, Ipv4Addr, TcpListener}, str::FromStr, + sync::Arc, }; use substrate_test_runtime_client::runtime; +use tokio::sync::Mutex; const MAX_SIZE: u64 = 2u64.pow(30); const SAMPLE_SIZE: usize = 50; @@ -115,7 +117,13 @@ where (worker, notification_service) } -fn setup_workers() -> (Box, Box, PeerId) +struct BenchSetup { + notification_service1: Arc>>, + notification_service2: Arc>>, + peer_id2: PeerId, +} + +fn setup_workers() -> Arc where B: BlockT + 'static, H: ExHashT, @@ -123,8 +131,8 @@ where { let listen_address1 = get_listen_address(); let listen_address2 = get_listen_address(); - let (worker1, mut notification_service1) = create_network_worker::(listen_address1); - let (worker2, mut notification_service2) = + let (worker1, notification_service1) = create_network_worker::(listen_address1); + let (worker2, notification_service2) = create_network_worker::(listen_address2.clone()); let peer_id2: sc_network::PeerId = worker2.network_service().local_peer_id().into(); @@ -133,19 +141,32 @@ where .add_reserved_peer(MultiaddrWithPeerId { multiaddr: listen_address2, peer_id: peer_id2 }) .unwrap(); - let mut notification_service_cloned1 = notification_service1.clone().unwrap(); - let mut notification_service_cloned2 = notification_service2.clone().unwrap(); + let notification_service1 = Arc::new(Mutex::new(notification_service1)); + let notification_service2 = Arc::new(Mutex::new(notification_service2)); + tokio::spawn(worker1.run()); tokio::spawn(worker2.run()); - let ready = tokio::spawn(async move { - loop { - tokio::select! { - Some(event) = notification_service_cloned1.next_event() => { - if let NotificationEvent::NotificationStreamOpened { .. } = event { - break; - } - }, - Some(_event) = notification_service_cloned2.next_event() => {}, + + let ready = tokio::spawn({ + let notification_service1 = Arc::clone(¬ification_service1); + let notification_service2 = Arc::clone(¬ification_service2); + + async move { + let mut notification_service1 = notification_service1.lock().await; + let mut notification_service2 = notification_service2.lock().await; + loop { + tokio::select! { + Some(event) = notification_service1.next_event() => { + if let NotificationEvent::NotificationStreamOpened { .. } = event { + break; + } + }, + Some(event) = notification_service2.next_event() => { + if let NotificationEvent::ValidateInboundSubstream { result_tx, .. } = event { + result_tx.send(sc_network::service::traits::ValidationResult::Accept).unwrap(); + } + }, + } } } }); @@ -154,51 +175,40 @@ where let _ = tokio::runtime::Handle::current().block_on(ready); }); - (notification_service1, notification_service2, peer_id2) + Arc::new(BenchSetup { notification_service1, notification_service2, peer_id2 }) } -async fn run_serially( - (mut notification_service1, mut notification_service2, peer_id2): ( - Box, - Box, - PeerId, - ), - size: usize, - limit: usize, -) { +async fn run_serially(setup: Arc, size: usize, limit: usize) { let (tx, rx) = async_channel::bounded(1); - let ready_tx = tx.clone(); - let network1 = tokio::spawn(async move { - while let Some(event) = notification_service1.next_event().await { - if let NotificationEvent::NotificationStreamOpened { .. } = event { - let _ = ready_tx.send(Some(())).await; - break; - }; - } - while let Ok(message) = rx.recv().await { - let Some(_) = message else { break }; - notification_service1 - .send_async_notification(&peer_id2, vec![0; size]) - .await - .unwrap(); + let _ = tx.send(Some(())).await; + let network1 = tokio::spawn({ + let notification_service1 = Arc::clone(&setup.notification_service1); + let peer_id2 = setup.peer_id2; + async move { + let mut notification_service1 = notification_service1.lock().await; + while let Ok(message) = rx.recv().await { + let Some(_) = message else { break }; + notification_service1 + .send_async_notification(&peer_id2, vec![0; size]) + .await + .unwrap(); + } } }); - let network2 = tokio::spawn(async move { - let mut received_counter = 0; - while let Some(event) = notification_service2.next_event().await { - assert_matches!( - event, - NotificationEvent::ValidateInboundSubstream { .. } | - NotificationEvent::NotificationStreamOpened { .. } | - NotificationEvent::NotificationReceived { .. } - ); - if let NotificationEvent::NotificationReceived { .. } = event { - received_counter += 1; - if received_counter >= limit { - let _ = tx.send(None).await; - break; + let network2 = tokio::spawn({ + let notification_service2 = Arc::clone(&setup.notification_service2); + async move { + let mut notification_service2 = notification_service2.lock().await; + let mut received_counter = 0; + while let Some(event) = notification_service2.next_event().await { + if let NotificationEvent::NotificationReceived { .. } = event { + received_counter += 1; + if received_counter >= limit { + let _ = tx.send(None).await; + break; + } + let _ = tx.send(Some(())).await; } - let _ = tx.send(Some(())).await; } } }); @@ -206,41 +216,33 @@ async fn run_serially( let _ = tokio::join!(network1, network2); } -async fn run_with_backpressure( - (mut notification_service1, mut notification_service2, peer_id2): ( - Box, - Box, - PeerId, - ), - size: usize, - limit: usize, -) { - let network1 = tokio::spawn(async move { - while let Some(event) = notification_service1.next_event().await { - if let NotificationEvent::NotificationStreamOpened { .. } = event { - for _ in 0..limit { - notification_service1 - .send_async_notification(&peer_id2, vec![0; size]) - .await - .unwrap(); - } - break; +async fn run_with_backpressure(setup: Arc, size: usize, limit: usize) { + let (tx, rx) = async_channel::bounded(1); + let network1 = tokio::spawn({ + let setup = Arc::clone(&setup); + async move { + let mut notification_service1 = setup.notification_service1.lock().await; + for _ in 0..limit { + notification_service1 + .send_async_notification(&setup.peer_id2, vec![0; size]) + .await + .unwrap(); } + let _ = rx.recv().await; } }); - let network2 = tokio::spawn(async move { - let mut received_counter = 0; - while let Some(event) = notification_service2.next_event().await { - assert_matches!( - event, - NotificationEvent::ValidateInboundSubstream { .. } | - NotificationEvent::NotificationStreamOpened { .. } | - NotificationEvent::NotificationReceived { .. } - ); - if let NotificationEvent::NotificationReceived { .. } = event { - received_counter += 1; - if received_counter >= limit { - break + let network2 = tokio::spawn({ + let setup = Arc::clone(&setup); + async move { + let mut notification_service2 = setup.notification_service2.lock().await; + let mut received_counter = 0; + while let Some(event) = notification_service2.next_event().await { + if let NotificationEvent::NotificationReceived { .. } = event { + received_counter += 1; + if received_counter >= limit { + let _ = tx.send(()).await; + break; + } } } } @@ -263,9 +265,9 @@ fn run_benchmark(c: &mut Criterion) { BenchmarkId::new("libp2p/serially", label), &(size, NOTIFICATIONS), |b, &(size, limit)| { - b.to_async(&rt).iter_batched( + b.to_async(&rt).iter_batched_ref( setup_workers::>, - |setup| run_serially(setup, size, limit), + |setup| run_serially(Arc::clone(setup), size, limit), criterion::BatchSize::SmallInput, ); }, @@ -274,9 +276,9 @@ fn run_benchmark(c: &mut Criterion) { BenchmarkId::new("litep2p/serially", label), &(size, NOTIFICATIONS), |b, &(size, limit)| { - b.to_async(&rt).iter_batched( + b.to_async(&rt).iter_batched_ref( setup_workers::, - |setup| run_serially(setup, size, limit), + |setup| run_serially(Arc::clone(setup), size, limit), criterion::BatchSize::SmallInput, ); }, @@ -285,9 +287,9 @@ fn run_benchmark(c: &mut Criterion) { BenchmarkId::new("libp2p/with_backpressure", label), &(size, NOTIFICATIONS), |b, &(size, limit)| { - b.to_async(&rt).iter_batched( + b.to_async(&rt).iter_batched_ref( setup_workers::>, - |setup| run_with_backpressure(setup, size, limit), + |setup| run_with_backpressure(Arc::clone(setup), size, limit), criterion::BatchSize::SmallInput, ); }, @@ -296,9 +298,9 @@ fn run_benchmark(c: &mut Criterion) { BenchmarkId::new("litep2p/with_backpressure", label), &(size, NOTIFICATIONS), |b, &(size, limit)| { - b.to_async(&rt).iter_batched( + b.to_async(&rt).iter_batched_ref( setup_workers::, - |setup| run_with_backpressure(setup, size, limit), + |setup| run_with_backpressure(Arc::clone(setup), size, limit), criterion::BatchSize::SmallInput, ); }, From 32919fb4345ced695357d8511a2c35ba68877b8c Mon Sep 17 00:00:00 2001 From: Andrei Eres Date: Fri, 22 Nov 2024 14:27:48 +0100 Subject: [PATCH 04/10] Initialize workers aside --- .../network/benches/notifications_protocol.rs | 172 +++++++++--------- 1 file changed, 83 insertions(+), 89 deletions(-) diff --git a/substrate/client/network/benches/notifications_protocol.rs b/substrate/client/network/benches/notifications_protocol.rs index 99307a7cfb72..73d1f820c71e 100644 --- a/substrate/client/network/benches/notifications_protocol.rs +++ b/substrate/client/network/benches/notifications_protocol.rs @@ -16,7 +16,6 @@ // You should have received a copy of the GNU General Public License // along with this program. If not, see . -use assert_matches::assert_matches; use criterion::{ criterion_group, criterion_main, AxisScale, BenchmarkId, Criterion, PlotConfiguration, Throughput, @@ -26,57 +25,42 @@ use sc_network::{ FullNetworkConfiguration, MultiaddrWithPeerId, NetworkConfiguration, NonReservedPeerMode, NotificationHandshake, Params, ProtocolId, Role, SetConfig, }, - service::traits::NotificationEvent, + service::traits::{NetworkService, NotificationEvent}, Litep2pNetworkBackend, NetworkBackend, NetworkWorker, NotificationMetrics, NotificationService, PeerId, Roles, }; use sc_network_common::{sync::message::BlockAnnouncesHandshake, ExHashT}; -use sc_network_types::build_multiaddr; use sp_core::H256; use sp_runtime::traits::{Block as BlockT, Zero}; -use std::{ - net::{IpAddr, Ipv4Addr, TcpListener}, - str::FromStr, - sync::Arc, -}; +use std::{sync::Arc, time::Duration}; use substrate_test_runtime_client::runtime; -use tokio::sync::Mutex; +use tokio::{sync::Mutex, task::JoinHandle}; -const MAX_SIZE: u64 = 2u64.pow(30); -const SAMPLE_SIZE: usize = 50; -const NOTIFICATIONS: usize = 50; -const EXPONENTS: &[(u32, &'static str)] = &[ - (6, "64B"), - (9, "512B"), - (12, "4KB"), - (15, "64KB"), - (18, "256KB"), - (21, "2MB"), - (24, "16MB"), - (27, "128MB"), +const SMALL_PAYLOAD: &[(u32, usize, &'static str)] = &[ + // (Exponent of size, number of notifications, label) + (6, 100, "64B"), + (9, 100, "512B"), + (12, 100, "4KB"), + (15, 100, "64KB"), ]; - -// TODO: It's be better to bind system-provided port when initializing the worker -fn get_listen_address() -> sc_network::Multiaddr { - let ip = Ipv4Addr::from_str("127.0.0.1").unwrap(); - let listener = TcpListener::bind((IpAddr::V4(ip), 0)).unwrap(); // Bind to a random port - let local_addr = listener.local_addr().unwrap(); - let port = local_addr.port(); - - build_multiaddr!(Ip4(ip), Tcp(port)) -} +// const LARGE_PAYLOAD: &[(u32, usize, &'static str)] = &[ +// // (Exponent of size, number of notifications, label) +// (18, 1, "256KB"), +// (21, 1, "2MB"), +// (24, 1, "16MB"), +// (27, 1, "128MB"), +// ]; +const MAX_SIZE: u64 = 2u64.pow(30); fn create_network_worker( - listen_addr: sc_network::Multiaddr, -) -> (N, Box) +) -> (N, Arc, Arc>>) where B: BlockT + 'static, H: ExHashT, N: NetworkBackend, { let role = Role::Full; - let mut net_conf = NetworkConfiguration::new_local(); - net_conf.listen_addresses = vec![listen_addr]; + let net_conf = NetworkConfiguration::new_local(); let network_config = FullNetworkConfiguration::::new(&net_conf, None); let genesis_hash = runtime::Hash::zero(); let (block_announce_config, notification_service) = N::notification_config( @@ -113,45 +97,59 @@ where notification_metrics: NotificationMetrics::new(None), }) .unwrap(); + let network_service = worker.network_service(); + let notification_service = Arc::new(Mutex::new(notification_service)); - (worker, notification_service) + (worker, network_service, notification_service) } struct BenchSetup { notification_service1: Arc>>, notification_service2: Arc>>, peer_id2: PeerId, + handle1: JoinHandle<()>, + handle2: JoinHandle<()>, } -fn setup_workers() -> Arc +impl Drop for BenchSetup { + fn drop(&mut self) { + self.handle1.abort(); + self.handle2.abort(); + } +} + +fn setup_workers(rt: &tokio::runtime::Runtime) -> Arc where B: BlockT + 'static, H: ExHashT, N: NetworkBackend, { - let listen_address1 = get_listen_address(); - let listen_address2 = get_listen_address(); - let (worker1, notification_service1) = create_network_worker::(listen_address1); - let (worker2, notification_service2) = - create_network_worker::(listen_address2.clone()); - let peer_id2: sc_network::PeerId = worker2.network_service().local_peer_id().into(); + let _guard = rt.enter(); - worker1 - .network_service() - .add_reserved_peer(MultiaddrWithPeerId { multiaddr: listen_address2, peer_id: peer_id2 }) - .unwrap(); - - let notification_service1 = Arc::new(Mutex::new(notification_service1)); - let notification_service2 = Arc::new(Mutex::new(notification_service2)); - - tokio::spawn(worker1.run()); - tokio::spawn(worker2.run()); + let (worker1, network_service1, notification_service1) = create_network_worker::(); + let (worker2, network_service2, notification_service2) = create_network_worker::(); + let peer_id2: sc_network::PeerId = network_service2.local_peer_id().into(); + let handle1 = tokio::spawn(worker1.run()); + let handle2 = tokio::spawn(worker2.run()); let ready = tokio::spawn({ let notification_service1 = Arc::clone(¬ification_service1); let notification_service2 = Arc::clone(¬ification_service2); async move { + let listen_address2 = { + while network_service2.listen_addresses().is_empty() { + tokio::time::sleep(Duration::from_millis(10)).await; + } + network_service2.listen_addresses()[0].clone() + }; + network_service1 + .add_reserved_peer(MultiaddrWithPeerId { + multiaddr: listen_address2, + peer_id: peer_id2, + }) + .unwrap(); + let mut notification_service1 = notification_service1.lock().await; let mut notification_service2 = notification_service2.lock().await; loop { @@ -175,7 +173,13 @@ where let _ = tokio::runtime::Handle::current().block_on(ready); }); - Arc::new(BenchSetup { notification_service1, notification_service2, peer_id2 }) + Arc::new(BenchSetup { + notification_service1, + notification_service2, + peer_id2, + handle1, + handle2, + }) } async fn run_serially(setup: Arc, size: usize, limit: usize) { @@ -254,63 +258,53 @@ async fn run_with_backpressure(setup: Arc, size: usize, limit: usize fn run_benchmark(c: &mut Criterion) { let rt = tokio::runtime::Runtime::new().unwrap(); let plot_config = PlotConfiguration::default().summary_scale(AxisScale::Logarithmic); - let mut group = c.benchmark_group("notifications_benchmark"); + let mut group = c.benchmark_group("libp2p"); group.plot_config(plot_config); - for &(exponent, label) in EXPONENTS.iter() { + let libp2p_setup = setup_workers::>(&rt); + for &(exponent, limit, label) in SMALL_PAYLOAD.iter() { let size = 2usize.pow(exponent); - group.throughput(Throughput::Bytes(NOTIFICATIONS as u64 * size as u64)); - + group.throughput(Throughput::Bytes(limit as u64 * size as u64)); group.bench_with_input( BenchmarkId::new("libp2p/serially", label), - &(size, NOTIFICATIONS), + &(size, limit), |b, &(size, limit)| { - b.to_async(&rt).iter_batched_ref( - setup_workers::>, - |setup| run_serially(Arc::clone(setup), size, limit), - criterion::BatchSize::SmallInput, - ); + b.to_async(&rt).iter(|| run_serially(Arc::clone(&libp2p_setup), size, limit)); }, ); group.bench_with_input( - BenchmarkId::new("litep2p/serially", label), - &(size, NOTIFICATIONS), + BenchmarkId::new("libp2p/with_backpressure", label), + &(size, limit), |b, &(size, limit)| { - b.to_async(&rt).iter_batched_ref( - setup_workers::, - |setup| run_serially(Arc::clone(setup), size, limit), - criterion::BatchSize::SmallInput, - ); + b.to_async(&rt) + .iter(|| run_with_backpressure(Arc::clone(&libp2p_setup), size, limit)); }, ); + } + drop(libp2p_setup); + + let litep2p_setup = setup_workers::(&rt); + for &(exponent, limit, label) in SMALL_PAYLOAD.iter() { + let size = 2usize.pow(exponent); + group.throughput(Throughput::Bytes(limit as u64 * size as u64)); group.bench_with_input( - BenchmarkId::new("libp2p/with_backpressure", label), - &(size, NOTIFICATIONS), + BenchmarkId::new("litep2p/serially", label), + &(size, limit), |b, &(size, limit)| { - b.to_async(&rt).iter_batched_ref( - setup_workers::>, - |setup| run_with_backpressure(Arc::clone(setup), size, limit), - criterion::BatchSize::SmallInput, - ); + b.to_async(&rt).iter(|| run_serially(Arc::clone(&litep2p_setup), size, limit)); }, ); group.bench_with_input( BenchmarkId::new("litep2p/with_backpressure", label), - &(size, NOTIFICATIONS), + &(size, limit), |b, &(size, limit)| { - b.to_async(&rt).iter_batched_ref( - setup_workers::, - |setup| run_with_backpressure(Arc::clone(setup), size, limit), - criterion::BatchSize::SmallInput, - ); + b.to_async(&rt) + .iter(|| run_with_backpressure(Arc::clone(&litep2p_setup), size, limit)); }, ); } + drop(litep2p_setup); } -criterion_group! { - name = benches; - config = Criterion::default().sample_size(SAMPLE_SIZE); - targets = run_benchmark -} +criterion_group!(benches, run_benchmark); criterion_main!(benches); From 60a3e94e06b24f9e0af4dbed164123593113a99d Mon Sep 17 00:00:00 2001 From: Andrei Eres Date: Fri, 22 Nov 2024 15:12:53 +0100 Subject: [PATCH 05/10] Split payloads --- .../network/benches/notifications_protocol.rs | 71 ++++++++++++++++--- 1 file changed, 61 insertions(+), 10 deletions(-) diff --git a/substrate/client/network/benches/notifications_protocol.rs b/substrate/client/network/benches/notifications_protocol.rs index 73d1f820c71e..5cf79f90ac6a 100644 --- a/substrate/client/network/benches/notifications_protocol.rs +++ b/substrate/client/network/benches/notifications_protocol.rs @@ -43,13 +43,13 @@ const SMALL_PAYLOAD: &[(u32, usize, &'static str)] = &[ (12, 100, "4KB"), (15, 100, "64KB"), ]; -// const LARGE_PAYLOAD: &[(u32, usize, &'static str)] = &[ -// // (Exponent of size, number of notifications, label) -// (18, 1, "256KB"), -// (21, 1, "2MB"), -// (24, 1, "16MB"), -// (27, 1, "128MB"), -// ]; +const LARGE_PAYLOAD: &[(u32, usize, &'static str)] = &[ + // (Exponent of size, number of notifications, label) + (18, 10, "256KB"), + (21, 10, "2MB"), + (24, 10, "16MB"), + (27, 10, "128MB"), +]; const MAX_SIZE: u64 = 2u64.pow(30); fn create_network_worker( @@ -255,10 +255,10 @@ async fn run_with_backpressure(setup: Arc, size: usize, limit: usize let _ = tokio::join!(network1, network2); } -fn run_benchmark(c: &mut Criterion) { +fn run_benchmark_with_small_payload(c: &mut Criterion) { let rt = tokio::runtime::Runtime::new().unwrap(); let plot_config = PlotConfiguration::default().summary_scale(AxisScale::Logarithmic); - let mut group = c.benchmark_group("libp2p"); + let mut group = c.benchmark_group("notifications_protocol/small_payload"); group.plot_config(plot_config); let libp2p_setup = setup_workers::>(&rt); @@ -306,5 +306,56 @@ fn run_benchmark(c: &mut Criterion) { drop(litep2p_setup); } -criterion_group!(benches, run_benchmark); +fn run_benchmark_with_large_payload(c: &mut Criterion) { + let rt = tokio::runtime::Runtime::new().unwrap(); + let plot_config = PlotConfiguration::default().summary_scale(AxisScale::Logarithmic); + let mut group = c.benchmark_group("notifications_protocol/large_payload"); + group.plot_config(plot_config); + + let libp2p_setup = setup_workers::>(&rt); + for &(exponent, limit, label) in LARGE_PAYLOAD.iter() { + let size = 2usize.pow(exponent); + group.throughput(Throughput::Bytes(limit as u64 * size as u64)); + group.bench_with_input( + BenchmarkId::new("libp2p/serially", label), + &(size, limit), + |b, &(size, limit)| { + b.to_async(&rt).iter(|| run_serially(Arc::clone(&libp2p_setup), size, limit)); + }, + ); + group.bench_with_input( + BenchmarkId::new("libp2p/with_backpressure", label), + &(size, limit), + |b, &(size, limit)| { + b.to_async(&rt) + .iter(|| run_with_backpressure(Arc::clone(&libp2p_setup), size, limit)); + }, + ); + } + drop(libp2p_setup); + + let litep2p_setup = setup_workers::(&rt); + for &(exponent, limit, label) in LARGE_PAYLOAD.iter() { + let size = 2usize.pow(exponent); + group.throughput(Throughput::Bytes(limit as u64 * size as u64)); + group.bench_with_input( + BenchmarkId::new("litep2p/serially", label), + &(size, limit), + |b, &(size, limit)| { + b.to_async(&rt).iter(|| run_serially(Arc::clone(&litep2p_setup), size, limit)); + }, + ); + group.bench_with_input( + BenchmarkId::new("litep2p/with_backpressure", label), + &(size, limit), + |b, &(size, limit)| { + b.to_async(&rt) + .iter(|| run_with_backpressure(Arc::clone(&litep2p_setup), size, limit)); + }, + ); + } + drop(litep2p_setup); +} + +criterion_group!(benches, run_benchmark_with_small_payload, run_benchmark_with_large_payload); criterion_main!(benches); From 871f342b1b6dd098be438fe6f21c456c96e5f362 Mon Sep 17 00:00:00 2001 From: Andrei Eres Date: Mon, 25 Nov 2024 11:23:04 +0100 Subject: [PATCH 06/10] Update requests bench --- .../benches/request_response_protocol.rs | 317 ++++++++++-------- 1 file changed, 171 insertions(+), 146 deletions(-) diff --git a/substrate/client/network/benches/request_response_protocol.rs b/substrate/client/network/benches/request_response_protocol.rs index b428d0d75ac5..908eb89f4e04 100644 --- a/substrate/client/network/benches/request_response_protocol.rs +++ b/substrate/client/network/benches/request_response_protocol.rs @@ -25,46 +25,39 @@ use sc_network::{ FullNetworkConfiguration, IncomingRequest, NetworkConfiguration, NonReservedPeerMode, NotificationHandshake, OutgoingResponse, Params, ProtocolId, Role, SetConfig, }, + service::traits::NetworkService, IfDisconnected, Litep2pNetworkBackend, NetworkBackend, NetworkRequest, NetworkWorker, - NotificationMetrics, NotificationService, Roles, + NotificationMetrics, NotificationService, PeerId, Roles, }; use sc_network_common::{sync::message::BlockAnnouncesHandshake, ExHashT}; -use sc_network_types::build_multiaddr; use sp_core::H256; use sp_runtime::traits::{Block as BlockT, Zero}; -use std::{ - net::{IpAddr, Ipv4Addr, TcpListener}, - str::FromStr, - time::Duration, -}; +use std::{sync::Arc, time::Duration}; use substrate_test_runtime_client::runtime; +use tokio::{sync::Mutex, task::JoinHandle}; const MAX_SIZE: u64 = 2u64.pow(30); -const SAMPLE_SIZE: usize = 50; -const REQUESTS: usize = 50; -const EXPONENTS: &[(u32, &'static str)] = &[ - (6, "64B"), - (9, "512B"), - (12, "4KB"), - (15, "64KB"), - (18, "256KB"), - (21, "2MB"), - (24, "16MB"), - (27, "128MB"), +const SMALL_PAYLOAD: &[(u32, usize, &'static str)] = &[ + // (Exponent of size, number of requests, label) + (6, 100, "64B"), + (9, 100, "512B"), + (12, 100, "4KB"), + (15, 100, "64KB"), +]; +const LARGE_PAYLOAD: &[(u32, usize, &'static str)] = &[ + // (Exponent of size, number of requests, label) + (18, 10, "256KB"), + (21, 10, "2MB"), + (24, 10, "16MB"), + (27, 10, "128MB"), ]; -fn get_listen_address() -> sc_network::Multiaddr { - let ip = Ipv4Addr::from_str("127.0.0.1").unwrap(); - let listener = TcpListener::bind((IpAddr::V4(ip), 0)).unwrap(); // Bind to a random port - let local_addr = listener.local_addr().unwrap(); - let port = local_addr.port(); - - build_multiaddr!(Ip4(ip), Tcp(port)) -} - -pub fn create_network_worker( - listen_addr: sc_network::Multiaddr, -) -> (N, async_channel::Receiver, Box) +pub fn create_network_worker() -> ( + N, + Arc, + async_channel::Receiver, + Arc>>, +) where B: BlockT + 'static, H: ExHashT, @@ -80,8 +73,7 @@ where Some(tx), ); let role = Role::Full; - let mut net_conf = NetworkConfiguration::new_local(); - net_conf.listen_addresses = vec![listen_addr]; + let net_conf = NetworkConfiguration::new_local(); let mut network_config = FullNetworkConfiguration::new(&net_conf, None); network_config.add_request_response_protocol(request_response_config); let genesis_hash = runtime::Hash::zero(); @@ -119,71 +111,126 @@ where notification_metrics: NotificationMetrics::new(None), }) .unwrap(); + let notification_service = Arc::new(Mutex::new(notification_service)); + let network_service = worker.network_service(); + + (worker, network_service, rx, notification_service) +} - (worker, rx, notification_service) +struct BenchSetup { + #[allow(dead_code)] + notification_service1: Arc>>, + #[allow(dead_code)] + notification_service2: Arc>>, + network_service1: Arc, + peer_id2: PeerId, + handle1: JoinHandle<()>, + handle2: JoinHandle<()>, + #[allow(dead_code)] + rx1: async_channel::Receiver, + rx2: async_channel::Receiver, } -async fn run_serially(size: usize, limit: usize) +impl Drop for BenchSetup { + fn drop(&mut self) { + self.handle1.abort(); + self.handle2.abort(); + } +} + +fn setup_workers(rt: &tokio::runtime::Runtime) -> Arc where B: BlockT + 'static, H: ExHashT, N: NetworkBackend, { - let listen_address1 = get_listen_address(); - let listen_address2 = get_listen_address(); - let (worker1, _rx1, _notification_service1) = create_network_worker::(listen_address1); - let service1 = worker1.network_service().clone(); - let (worker2, rx2, _notification_service2) = - create_network_worker::(listen_address2.clone()); + let _guard = rt.enter(); + + let (worker1, network_service1, rx1, notification_service1) = + create_network_worker::(); + let (worker2, network_service2, rx2, notification_service2) = + create_network_worker::(); let peer_id2 = worker2.network_service().local_peer_id(); + let handle1 = tokio::spawn(worker1.run()); + let handle2 = tokio::spawn(worker2.run()); - worker1.network_service().add_known_address(peer_id2, listen_address2.into()); + let ready = tokio::spawn({ + let network_service1 = Arc::clone(&network_service1); - let network1_run = worker1.run(); - let network2_run = worker2.run(); - let (break_tx, break_rx) = async_channel::bounded(10); - let requests = async move { - let mut sent_counter = 0; - while sent_counter < limit { - let _ = service1 - .request( - peer_id2.into(), - "/request-response/1".into(), - vec![0; 2], - None, - IfDisconnected::TryConnect, - ) - .await - .unwrap(); - sent_counter += 1; + async move { + let listen_address2 = { + while network_service2.listen_addresses().is_empty() { + tokio::time::sleep(Duration::from_millis(10)).await; + } + network_service2.listen_addresses()[0].clone() + }; + network_service1.add_known_address(peer_id2, listen_address2.into()); + } + }); + + tokio::task::block_in_place(|| { + let _ = tokio::runtime::Handle::current().block_on(ready); + }); + + Arc::new(BenchSetup { + notification_service1, + notification_service2, + network_service1, + peer_id2, + handle1, + handle2, + rx1, + rx2, + }) +} + +async fn run_serially(setup: Arc, size: usize, limit: usize) { + let (break_tx, break_rx) = async_channel::bounded(1); + let requests = { + let network_service1 = Arc::clone(&setup.network_service1); + let peer_id2 = setup.peer_id2; + async move { + let mut sent_counter = 0; + while sent_counter < limit { + let _ = network_service1 + .request( + peer_id2.into(), + "/request-response/1".into(), + vec![0; 2], + None, + IfDisconnected::TryConnect, + ) + .await + .unwrap(); + sent_counter += 1; + } + let _ = break_tx.send(()).await; } - let _ = break_tx.send(()).await; }; let network1 = tokio::spawn(async move { tokio::pin!(requests); - tokio::pin!(network1_run); loop { tokio::select! { - _ = &mut network1_run => {}, _ = &mut requests => break, } } }); - let network2 = tokio::spawn(async move { - tokio::pin!(network2_run); - loop { - tokio::select! { - _ = &mut network2_run => {}, - res = rx2.recv() => { - let IncomingRequest { pending_response, .. } = res.unwrap(); - pending_response.send(OutgoingResponse { - result: Ok(vec![0; size]), - reputation_changes: vec![], - sent_feedback: None, - }).unwrap(); - }, - _ = break_rx.recv() => break, + let network2 = tokio::spawn({ + let rx2 = setup.rx2.clone(); + async move { + loop { + tokio::select! { + res = rx2.recv() => { + let IncomingRequest { pending_response, .. } = res.unwrap(); + pending_response.send(OutgoingResponse { + result: Ok(vec![0; size]), + reputation_changes: vec![], + sent_feedback: None, + }).unwrap(); + }, + _ = break_rx.recv() => break, + } } } }); @@ -194,29 +241,12 @@ where // The libp2p request-response implementation does not provide any backpressure feedback. // So this benchmark is useless until we implement it for litep2p. #[allow(dead_code)] -async fn run_with_backpressure(size: usize, limit: usize) -where - B: BlockT + 'static, - H: ExHashT, - N: NetworkBackend, -{ - let listen_address1 = get_listen_address(); - let listen_address2 = get_listen_address(); - let (worker1, _rx1, _notification_service1) = create_network_worker::(listen_address1); - let service1 = worker1.network_service().clone(); - let (worker2, rx2, _notification_service2) = - create_network_worker::(listen_address2.clone()); - let peer_id2 = worker2.network_service().local_peer_id(); - - worker1.network_service().add_known_address(peer_id2, listen_address2.into()); - - let network1_run = worker1.run(); - let network2_run = worker2.run(); - let (break_tx, break_rx) = async_channel::bounded(10); +async fn run_with_backpressure(setup: Arc, size: usize, limit: usize) { + let (break_tx, break_rx) = async_channel::bounded(1); let requests = futures::future::join_all((0..limit).into_iter().map(|_| { let (tx, rx) = futures::channel::oneshot::channel(); - service1.start_request( - peer_id2.into(), + setup.network_service1.start_request( + setup.peer_id2.into(), "/request-response/1".into(), vec![0; 8], None, @@ -227,77 +257,72 @@ where })); let network1 = tokio::spawn(async move { - tokio::pin!(requests); - tokio::pin!(network1_run); - loop { - tokio::select! { - _ = &mut network1_run => {}, - responses = &mut requests => { - for res in responses { - res.unwrap().unwrap(); - } - let _ = break_tx.send(()).await; - break; - }, - } + let responses = requests.await; + for res in responses { + res.unwrap().unwrap(); } + let _ = break_tx.send(()).await; }); let network2 = tokio::spawn(async move { - tokio::pin!(network2_run); - loop { - tokio::select! { - _ = &mut network2_run => {}, - res = rx2.recv() => { - let IncomingRequest { pending_response, .. } = res.unwrap(); - pending_response.send(OutgoingResponse { - result: Ok(vec![0; size]), - reputation_changes: vec![], - sent_feedback: None, - }).unwrap(); - }, - _ = break_rx.recv() => break, - } + for _ in 0..limit { + let IncomingRequest { pending_response, .. } = setup.rx2.recv().await.unwrap(); + pending_response + .send(OutgoingResponse { + result: Ok(vec![0; size]), + reputation_changes: vec![], + sent_feedback: None, + }) + .unwrap(); } + break_rx.recv().await }); let _ = tokio::join!(network1, network2); } -fn run_benchmark(c: &mut Criterion) { +fn run_benchmark(c: &mut Criterion, payload: &[(u32, usize, &'static str)], group: &str) { let rt = tokio::runtime::Runtime::new().unwrap(); let plot_config = PlotConfiguration::default().summary_scale(AxisScale::Logarithmic); - let mut group = c.benchmark_group("request_response_benchmark"); + let mut group = c.benchmark_group(group); group.plot_config(plot_config); - for &(exponent, label) in EXPONENTS.iter() { + let libp2p_setup = setup_workers::>(&rt); + for &(exponent, limit, label) in payload.iter() { let size = 2usize.pow(exponent); - group.throughput(Throughput::Bytes(REQUESTS as u64 * size as u64)); + group.throughput(Throughput::Bytes(limit as u64 * size as u64)); group.bench_with_input( BenchmarkId::new("libp2p/serially", label), - &(size, REQUESTS), + &(size, limit), |b, &(size, limit)| { - b.to_async(&rt).iter(|| { - run_serially::>(size, limit) - }); - }, - ); - group.bench_with_input( - BenchmarkId::new("litep2p/serially", label), - &(size, REQUESTS), - |b, &(size, limit)| { - b.to_async(&rt).iter(|| { - run_serially::( - size, limit, - ) - }); + b.to_async(&rt).iter(|| run_serially(Arc::clone(&libp2p_setup), size, limit)); }, ); } + drop(libp2p_setup); + + // TODO: NetworkRequest::request should be implemented for Litep2pNetworkService + let litep2p_setup = setup_workers::(&rt); + // for &(exponent, limit, label) in payload.iter() { + // let size = 2usize.pow(exponent); + // group.throughput(Throughput::Bytes(limit as u64 * size as u64)); + // group.bench_with_input( + // BenchmarkId::new("litep2p/serially", label), + // &(size, limit), + // |b, &(size, limit)| { + // b.to_async(&rt).iter(|| run_serially(Arc::clone(&litep2p_setup), size, limit)); + // }, + // ); + // } + drop(litep2p_setup); } -criterion_group! { - name = benches; - config = Criterion::default().sample_size(SAMPLE_SIZE); - targets = run_benchmark +fn run_benchmark_with_small_payload(c: &mut Criterion) { + run_benchmark(c, SMALL_PAYLOAD, "request_response_benchmark/small_payload"); } + +fn run_benchmark_with_large_payload(c: &mut Criterion) { + run_benchmark(c, LARGE_PAYLOAD, "request_response_benchmark/large_payload"); +} + +criterion_group!(benches, run_benchmark_with_small_payload, run_benchmark_with_large_payload); criterion_main!(benches); From 1d7e6a7f1d72d5b23bcb41b762b7b049dcc7e834 Mon Sep 17 00:00:00 2001 From: Andrei Eres Date: Mon, 25 Nov 2024 11:23:22 +0100 Subject: [PATCH 07/10] Update notifications bench --- .../network/benches/notifications_protocol.rs | 61 +++---------------- 1 file changed, 9 insertions(+), 52 deletions(-) diff --git a/substrate/client/network/benches/notifications_protocol.rs b/substrate/client/network/benches/notifications_protocol.rs index 5cf79f90ac6a..40a810d616b5 100644 --- a/substrate/client/network/benches/notifications_protocol.rs +++ b/substrate/client/network/benches/notifications_protocol.rs @@ -255,14 +255,14 @@ async fn run_with_backpressure(setup: Arc, size: usize, limit: usize let _ = tokio::join!(network1, network2); } -fn run_benchmark_with_small_payload(c: &mut Criterion) { +fn run_benchmark(c: &mut Criterion, payload: &[(u32, usize, &'static str)], group: &str) { let rt = tokio::runtime::Runtime::new().unwrap(); let plot_config = PlotConfiguration::default().summary_scale(AxisScale::Logarithmic); - let mut group = c.benchmark_group("notifications_protocol/small_payload"); + let mut group = c.benchmark_group(group); group.plot_config(plot_config); let libp2p_setup = setup_workers::>(&rt); - for &(exponent, limit, label) in SMALL_PAYLOAD.iter() { + for &(exponent, limit, label) in payload.iter() { let size = 2usize.pow(exponent); group.throughput(Throughput::Bytes(limit as u64 * size as u64)); group.bench_with_input( @@ -284,7 +284,7 @@ fn run_benchmark_with_small_payload(c: &mut Criterion) { drop(libp2p_setup); let litep2p_setup = setup_workers::(&rt); - for &(exponent, limit, label) in SMALL_PAYLOAD.iter() { + for &(exponent, limit, label) in payload.iter() { let size = 2usize.pow(exponent); group.throughput(Throughput::Bytes(limit as u64 * size as u64)); group.bench_with_input( @@ -306,55 +306,12 @@ fn run_benchmark_with_small_payload(c: &mut Criterion) { drop(litep2p_setup); } -fn run_benchmark_with_large_payload(c: &mut Criterion) { - let rt = tokio::runtime::Runtime::new().unwrap(); - let plot_config = PlotConfiguration::default().summary_scale(AxisScale::Logarithmic); - let mut group = c.benchmark_group("notifications_protocol/large_payload"); - group.plot_config(plot_config); - - let libp2p_setup = setup_workers::>(&rt); - for &(exponent, limit, label) in LARGE_PAYLOAD.iter() { - let size = 2usize.pow(exponent); - group.throughput(Throughput::Bytes(limit as u64 * size as u64)); - group.bench_with_input( - BenchmarkId::new("libp2p/serially", label), - &(size, limit), - |b, &(size, limit)| { - b.to_async(&rt).iter(|| run_serially(Arc::clone(&libp2p_setup), size, limit)); - }, - ); - group.bench_with_input( - BenchmarkId::new("libp2p/with_backpressure", label), - &(size, limit), - |b, &(size, limit)| { - b.to_async(&rt) - .iter(|| run_with_backpressure(Arc::clone(&libp2p_setup), size, limit)); - }, - ); - } - drop(libp2p_setup); +fn run_benchmark_with_small_payload(c: &mut Criterion) { + run_benchmark(c, SMALL_PAYLOAD, "notifications_protocol/small_payload"); +} - let litep2p_setup = setup_workers::(&rt); - for &(exponent, limit, label) in LARGE_PAYLOAD.iter() { - let size = 2usize.pow(exponent); - group.throughput(Throughput::Bytes(limit as u64 * size as u64)); - group.bench_with_input( - BenchmarkId::new("litep2p/serially", label), - &(size, limit), - |b, &(size, limit)| { - b.to_async(&rt).iter(|| run_serially(Arc::clone(&litep2p_setup), size, limit)); - }, - ); - group.bench_with_input( - BenchmarkId::new("litep2p/with_backpressure", label), - &(size, limit), - |b, &(size, limit)| { - b.to_async(&rt) - .iter(|| run_with_backpressure(Arc::clone(&litep2p_setup), size, limit)); - }, - ); - } - drop(litep2p_setup); +fn run_benchmark_with_large_payload(c: &mut Criterion) { + run_benchmark(c, LARGE_PAYLOAD, "notifications_protocol/large_payload"); } criterion_group!(benches, run_benchmark_with_small_payload, run_benchmark_with_large_payload); From be4bbb89303158c16db134974f8d4ae290b4abcf Mon Sep 17 00:00:00 2001 From: GitHub Action Date: Mon, 25 Nov 2024 12:35:16 +0000 Subject: [PATCH 08/10] Update from AndreiEres running command 'prdoc --audience node_dev' --- prdoc/pr_6636.prdoc | 17 +++++++++++++++++ 1 file changed, 17 insertions(+) create mode 100644 prdoc/pr_6636.prdoc diff --git a/prdoc/pr_6636.prdoc b/prdoc/pr_6636.prdoc new file mode 100644 index 000000000000..d53afc47f9d0 --- /dev/null +++ b/prdoc/pr_6636.prdoc @@ -0,0 +1,17 @@ +title: Optimize initialization of networking protocol benchmarks +doc: +- audience: Node Dev + description: |- + # Description + These changes should enhance the quality of benchmark results by excluding worker initialization time from the measurements and reducing the overall duration of the benchmarks. + + ### Integration + It should not affect any downstream projects. + + ### Review Notes + - Workers initialize once per benchmark to avoid side effects. + - The listen address is assigned when a worker starts. + - Benchmarks are divided into two groups by size to create better charts for comparison. +crates: +- name: sc-network + bump: major From 763ce3e935775ee23db4792912af5d75e3115cf6 Mon Sep 17 00:00:00 2001 From: Andrei Eres Date: Mon, 25 Nov 2024 14:04:19 +0100 Subject: [PATCH 09/10] Fix prdoc --- prdoc/pr_6636.prdoc | 10 +--------- 1 file changed, 1 insertion(+), 9 deletions(-) diff --git a/prdoc/pr_6636.prdoc b/prdoc/pr_6636.prdoc index d53afc47f9d0..1db5fd54d971 100644 --- a/prdoc/pr_6636.prdoc +++ b/prdoc/pr_6636.prdoc @@ -2,16 +2,8 @@ title: Optimize initialization of networking protocol benchmarks doc: - audience: Node Dev description: |- - # Description These changes should enhance the quality of benchmark results by excluding worker initialization time from the measurements and reducing the overall duration of the benchmarks. - ### Integration - It should not affect any downstream projects. - - ### Review Notes - - Workers initialize once per benchmark to avoid side effects. - - The listen address is assigned when a worker starts. - - Benchmarks are divided into two groups by size to create better charts for comparison. crates: - name: sc-network - bump: major + validate: false From 5a960950f38a5001719b4ec41be944b93ce4c955 Mon Sep 17 00:00:00 2001 From: Andrei Eres Date: Mon, 25 Nov 2024 16:45:25 +0100 Subject: [PATCH 10/10] Fix clippy --- .../network/benches/request_response_protocol.rs | 15 ++------------- 1 file changed, 2 insertions(+), 13 deletions(-) diff --git a/substrate/client/network/benches/request_response_protocol.rs b/substrate/client/network/benches/request_response_protocol.rs index 908eb89f4e04..85381112b753 100644 --- a/substrate/client/network/benches/request_response_protocol.rs +++ b/substrate/client/network/benches/request_response_protocol.rs @@ -186,12 +186,11 @@ where async fn run_serially(setup: Arc, size: usize, limit: usize) { let (break_tx, break_rx) = async_channel::bounded(1); - let requests = { + let network1 = tokio::spawn({ let network_service1 = Arc::clone(&setup.network_service1); let peer_id2 = setup.peer_id2; async move { - let mut sent_counter = 0; - while sent_counter < limit { + for _ in 0..limit { let _ = network_service1 .request( peer_id2.into(), @@ -202,19 +201,9 @@ async fn run_serially(setup: Arc, size: usize, limit: usize) { ) .await .unwrap(); - sent_counter += 1; } let _ = break_tx.send(()).await; } - }; - - let network1 = tokio::spawn(async move { - tokio::pin!(requests); - loop { - tokio::select! { - _ = &mut requests => break, - } - } }); let network2 = tokio::spawn({ let rx2 = setup.rx2.clone();