From 3a16264250e978ed8a0fb4b3899b1deb55db4923 Mon Sep 17 00:00:00 2001 From: Andrew Westberg Date: Wed, 20 Dec 2023 16:15:48 +0000 Subject: [PATCH] fix(network): Add tcp_nodelay --- examples/n2n-miniprotocols/Cargo.toml | 2 + examples/n2n-miniprotocols/src/main.rs | 203 +++++++++++++++++++------ pallas-network/Cargo.toml | 1 + pallas-network/src/multiplexer.rs | 11 +- 4 files changed, 169 insertions(+), 48 deletions(-) diff --git a/examples/n2n-miniprotocols/Cargo.toml b/examples/n2n-miniprotocols/Cargo.toml index 353d2b0a..38aa5279 100644 --- a/examples/n2n-miniprotocols/Cargo.toml +++ b/examples/n2n-miniprotocols/Cargo.toml @@ -11,6 +11,8 @@ pallas = { path = "../../pallas" } net2 = "0.2.37" hex = "0.4.3" log = "0.4.16" +thiserror = "1.0.31" +futures = "0.3.29" tracing = "0.1.37" tracing-subscriber = "0.3.16" tokio = { version = "1.27.0", features = ["rt-multi-thread"] } diff --git a/examples/n2n-miniprotocols/src/main.rs b/examples/n2n-miniprotocols/src/main.rs index f3d06d94..7c5f1877 100644 --- a/examples/n2n-miniprotocols/src/main.rs +++ b/examples/n2n-miniprotocols/src/main.rs @@ -1,77 +1,186 @@ -use pallas::network::{ +use pallas::{network::{ facades::PeerClient, - miniprotocols::{chainsync, Point, MAINNET_MAGIC}, -}; -use tokio::time::Instant; -use tracing::info; - -async fn do_blockfetch(peer: &mut PeerClient) { - let range = ( - Point::Specific( - 43847831, - hex::decode("15b9eeee849dd6386d3770b0745e0450190f7560e5159b1b3ab13b14b2684a45") - .unwrap(), - ), - Point::Specific( - 43847844, - hex::decode("ff8d558a3d5a0e058beb3d94d26a567f75cd7d09ff5485aa0d0ebc38b61378d4") - .unwrap(), - ), - ); - - let blocks = peer.blockfetch().fetch_range(range).await.unwrap(); - - for block in blocks { - info!("received block of size: {}", block.len()); + miniprotocols::{chainsync, Point, MAINNET_MAGIC, blockfetch, keepalive}, +}, ledger::traverse::MultiEraHeader}; +use tokio::{time::Instant, select}; +use thiserror::Error; +use futures::{future::FutureExt, pin_mut}; + +#[derive(Error, Debug)] +pub enum Error { + #[error("hex conversion error")] + FromHexError(#[from] hex::FromHexError), + + #[error("blockfetch error")] + BlockFetchError(#[from] blockfetch::ClientError), + + #[error("chainsync error")] + ChainSyncError(#[from] chainsync::ClientError), + + #[error("keepalive error")] + KeepAliveError(#[from] keepalive::Error), + + #[error("pallas_traverse error")] + PallasTraverseError(#[from] pallas::ledger::traverse::Error), +} + +async fn do_blockfetch(blockfetch_client: &mut blockfetch::Client, range: (Point, Point)) -> Result<(), Error> { + let blocks = blockfetch_client.fetch_range(range.clone()).await?; + + for block in &blocks { + tracing::trace!("received block of size: {}", block.len()); } + tracing::info!("received {} blocks. last slot: {}", blocks.len(), range.1.slot_or_default()); + Ok(()) } -async fn do_chainsync(peer: &mut PeerClient) { +async fn do_chainsync(chainsync_client: &mut chainsync::N2NClient, blockfetch_client: &mut blockfetch::Client) -> Result<(), Error> { let known_points = vec![Point::Specific( 43847831u64, - hex::decode("15b9eeee849dd6386d3770b0745e0450190f7560e5159b1b3ab13b14b2684a45").unwrap(), + hex::decode("15b9eeee849dd6386d3770b0745e0450190f7560e5159b1b3ab13b14b2684a45")?, )]; - let (point, _) = peer.chainsync().find_intersect(known_points).await.unwrap(); + let (point, _) = chainsync_client.find_intersect(known_points).await?; - info!("intersected point is {:?}", point); + tracing::info!("intersected point is {:?}", point); - let mut keepalive_timer = Instant::now(); - for _ in 0..10 { - if keepalive_timer.elapsed().as_secs() > 20 { - peer.keepalive().send_keepalive().await.unwrap(); - keepalive_timer = Instant::now(); - } - let next = peer.chainsync().request_next().await.unwrap(); + let mut block_count = 0u16; + let mut start_point = Point::Specific(0, vec![]); + let mut end_point: Point; + let mut next_log = Instant::now(); + loop { + let next = chainsync_client.request_next().await?; match next { chainsync::NextResponse::RollForward(h, _) => { - log::info!("rolling forward, header size: {}", h.cbor.len()) + tracing::trace!("rolling forward, header size: {}", h.cbor.len()); + let point = match h.byron_prefix { + None => { + let multi_era_header = MultiEraHeader::decode(h.variant, None, &h.cbor)?; + let slot = multi_era_header.slot(); + let hash = multi_era_header.hash().to_vec(); + let number = multi_era_header.number(); + match &multi_era_header { + MultiEraHeader::EpochBoundary(_) => { + tracing::info!("epoch boundary"); + None + }, + MultiEraHeader::AlonzoCompatible(_) | MultiEraHeader::Babbage(_) => { + if next_log.elapsed().as_secs() > 1 { + tracing::info!("chainsync block header: {}", number); + next_log = Instant::now(); + } + Some(Point::Specific(slot, hash)) + }, + MultiEraHeader::Byron(_) => { + tracing::info!("ignoring byron header"); + None + }, + } + } + Some(_) => { + tracing::info!("skipping byron block"); + None + } + }; + match point { + Some(p) => { + block_count += 1; + if block_count == 1 { + start_point = p; + } + else if block_count == 10 { + end_point = p; + do_blockfetch(blockfetch_client, (start_point.clone(), end_point.clone())).await?; + block_count = 0; + } + }, + None => {}, + }; } chainsync::NextResponse::RollBackward(x, _) => log::info!("rollback to {:?}", x), - chainsync::NextResponse::Await => log::info!("tip of chaing reached"), + chainsync::NextResponse::Await => tracing::info!("tip of chaing reached"), }; } } +async fn do_keepalive(keepalive_client: &mut keepalive::Client) -> Result<(), Error> { + let mut keepalive_timer = Instant::now(); + loop { + if keepalive_timer.elapsed().as_secs() > 20 { + tracing::info!("sending keepalive..."); + keepalive_client.send_keepalive().await?; + tracing::info!("keepalive sent"); + keepalive_timer = Instant::now(); + } + } +} + #[tokio::main] async fn main() { tracing::subscriber::set_global_default( tracing_subscriber::FmtSubscriber::builder() - .with_max_level(tracing::Level::TRACE) + .with_max_level(tracing::Level::INFO) .finish(), ) .unwrap(); - // setup a TCP socket to act as data bearer between our agents and the remote - // relay. - let mut peer = PeerClient::connect("relays-new.cardano-mainnet.iohk.io:3001", MAINNET_MAGIC) - .await - .unwrap(); + loop { + // setup a TCP socket to act as data bearer between our agents and the remote + // relay. + let server = "backbone.cardano-mainnet.iohk.io:3001"; + // let server = "localhost:6000"; + let mut peer = PeerClient::connect(server, MAINNET_MAGIC) + .await + .unwrap(); - // fetch an arbitrary batch of block - do_blockfetch(&mut peer).await; + let chainsync_handle = tokio::spawn(async move { + do_chainsync(&mut peer.chainsync, &mut peer.blockfetch).await?; + Ok::<(), Error>(()) + }).fuse(); + let keepalive_handle = tokio::spawn(async move { + do_keepalive(&mut peer.keepalive).await?; + Ok::<(), Error>(()) + }).fuse(); - // execute the chainsync flow from an arbitrary point in the chain - do_chainsync(&mut peer).await; + pin_mut!(chainsync_handle, keepalive_handle); + + // If any of these concurrent tasks exit or fail, the others are canceled. + select! { + chainsync_result = chainsync_handle => { + match chainsync_result { + Ok(result) => { + match result { + Ok(_) => {} + Err(error) => { + tracing::error!("chainsync error: {:?}", error); + } + } + } + Err(error) => { + tracing::error!("chainsync error: {:?}", error); + } + } + } + keepalive_result = keepalive_handle => { + match keepalive_result { + Ok(result) => { + match result { + Ok(_) => {} + Err(error) => { + tracing::error!("keepalive error: {:?}", error); + } + } + } + Err(error) => { + tracing::error!("keepalive error: {:?}", error); + } + } + } + } + peer.plexer_handle.abort(); + + tracing::info!("waiting 10 seconds before reconnecting..."); + tokio::time::sleep(tokio::time::Duration::from_secs(10)).await; + } } diff --git a/pallas-network/Cargo.toml b/pallas-network/Cargo.toml index 381d33ea..45b06a40 100644 --- a/pallas-network/Cargo.toml +++ b/pallas-network/Cargo.toml @@ -17,6 +17,7 @@ itertools = "0.10.5" pallas-codec = { version = "=0.20.0", path = "../pallas-codec" } pallas-crypto = { version = "=0.20.0", path = "../pallas-crypto" } rand = "0.8.5" +socket2 = "0.5.5" thiserror = "1.0.31" tokio = { version = "1", features = ["rt", "net", "io-util", "time", "sync", "macros"] } tracing = "0.1.37" diff --git a/pallas-network/src/multiplexer.rs b/pallas-network/src/multiplexer.rs index acbc3758..e69802b0 100644 --- a/pallas-network/src/multiplexer.rs +++ b/pallas-network/src/multiplexer.rs @@ -77,6 +77,15 @@ const BUFFER_LEN: usize = 1024 * 10; impl Bearer { pub async fn connect_tcp(addr: impl ToSocketAddrs) -> Result { let stream = TcpStream::connect(addr).await?; + // add tcp_keepalive + let sock_ref = socket2::SockRef::from(&stream); + let mut tcp_keepalive = socket2::TcpKeepalive::new(); + tcp_keepalive = tcp_keepalive.with_time(tokio::time::Duration::from_secs(20)); + tcp_keepalive = tcp_keepalive.with_interval(tokio::time::Duration::from_secs(20)); + let _ = sock_ref.set_tcp_keepalive(&tcp_keepalive); + // add tcp_nodelay + let _ = sock_ref.set_nodelay(true); + Ok(Self::Tcp(stream)) } @@ -353,7 +362,7 @@ impl Plexer { clock: Instant::now(), bearer: SegmentBuffer::new(bearer), ingress: tokio::sync::mpsc::channel(100), // TODO: define buffer - egress: tokio::sync::broadcast::channel(100), + egress: tokio::sync::broadcast::channel(100000), } }