Skip to content

Commit

Permalink
fix(network): Add tcp_nodelay
Browse files Browse the repository at this point in the history
  • Loading branch information
AndrewWestberg committed Dec 20, 2023
1 parent 1ed2161 commit 3a16264
Show file tree
Hide file tree
Showing 4 changed files with 169 additions and 48 deletions.
2 changes: 2 additions & 0 deletions examples/n2n-miniprotocols/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,8 @@ pallas = { path = "../../pallas" }
net2 = "0.2.37"
hex = "0.4.3"
log = "0.4.16"
thiserror = "1.0.31"
futures = "0.3.29"
tracing = "0.1.37"
tracing-subscriber = "0.3.16"
tokio = { version = "1.27.0", features = ["rt-multi-thread"] }
203 changes: 156 additions & 47 deletions examples/n2n-miniprotocols/src/main.rs
Original file line number Diff line number Diff line change
@@ -1,77 +1,186 @@
use pallas::network::{
use pallas::{network::{
facades::PeerClient,
miniprotocols::{chainsync, Point, MAINNET_MAGIC},
};
use tokio::time::Instant;
use tracing::info;

async fn do_blockfetch(peer: &mut PeerClient) {
let range = (
Point::Specific(
43847831,
hex::decode("15b9eeee849dd6386d3770b0745e0450190f7560e5159b1b3ab13b14b2684a45")
.unwrap(),
),
Point::Specific(
43847844,
hex::decode("ff8d558a3d5a0e058beb3d94d26a567f75cd7d09ff5485aa0d0ebc38b61378d4")
.unwrap(),
),
);

let blocks = peer.blockfetch().fetch_range(range).await.unwrap();

for block in blocks {
info!("received block of size: {}", block.len());
miniprotocols::{chainsync, Point, MAINNET_MAGIC, blockfetch, keepalive},
}, ledger::traverse::MultiEraHeader};
use tokio::{time::Instant, select};
use thiserror::Error;
use futures::{future::FutureExt, pin_mut};

#[derive(Error, Debug)]
pub enum Error {
#[error("hex conversion error")]
FromHexError(#[from] hex::FromHexError),

#[error("blockfetch error")]
BlockFetchError(#[from] blockfetch::ClientError),

#[error("chainsync error")]
ChainSyncError(#[from] chainsync::ClientError),

#[error("keepalive error")]
KeepAliveError(#[from] keepalive::Error),

#[error("pallas_traverse error")]
PallasTraverseError(#[from] pallas::ledger::traverse::Error),
}

async fn do_blockfetch(blockfetch_client: &mut blockfetch::Client, range: (Point, Point)) -> Result<(), Error> {
let blocks = blockfetch_client.fetch_range(range.clone()).await?;

for block in &blocks {
tracing::trace!("received block of size: {}", block.len());
}
tracing::info!("received {} blocks. last slot: {}", blocks.len(), range.1.slot_or_default());
Ok(())
}

async fn do_chainsync(peer: &mut PeerClient) {
async fn do_chainsync(chainsync_client: &mut chainsync::N2NClient, blockfetch_client: &mut blockfetch::Client) -> Result<(), Error> {
let known_points = vec![Point::Specific(
43847831u64,
hex::decode("15b9eeee849dd6386d3770b0745e0450190f7560e5159b1b3ab13b14b2684a45").unwrap(),
hex::decode("15b9eeee849dd6386d3770b0745e0450190f7560e5159b1b3ab13b14b2684a45")?,
)];

let (point, _) = peer.chainsync().find_intersect(known_points).await.unwrap();
let (point, _) = chainsync_client.find_intersect(known_points).await?;

info!("intersected point is {:?}", point);
tracing::info!("intersected point is {:?}", point);

let mut keepalive_timer = Instant::now();
for _ in 0..10 {
if keepalive_timer.elapsed().as_secs() > 20 {
peer.keepalive().send_keepalive().await.unwrap();
keepalive_timer = Instant::now();
}
let next = peer.chainsync().request_next().await.unwrap();
let mut block_count = 0u16;
let mut start_point = Point::Specific(0, vec![]);
let mut end_point: Point;
let mut next_log = Instant::now();
loop {
let next = chainsync_client.request_next().await?;

match next {
chainsync::NextResponse::RollForward(h, _) => {
log::info!("rolling forward, header size: {}", h.cbor.len())
tracing::trace!("rolling forward, header size: {}", h.cbor.len());
let point = match h.byron_prefix {
None => {
let multi_era_header = MultiEraHeader::decode(h.variant, None, &h.cbor)?;
let slot = multi_era_header.slot();
let hash = multi_era_header.hash().to_vec();
let number = multi_era_header.number();
match &multi_era_header {
MultiEraHeader::EpochBoundary(_) => {
tracing::info!("epoch boundary");
None
},
MultiEraHeader::AlonzoCompatible(_) | MultiEraHeader::Babbage(_) => {
if next_log.elapsed().as_secs() > 1 {
tracing::info!("chainsync block header: {}", number);
next_log = Instant::now();
}
Some(Point::Specific(slot, hash))
},
MultiEraHeader::Byron(_) => {
tracing::info!("ignoring byron header");
None
},
}
}
Some(_) => {
tracing::info!("skipping byron block");
None
}
};
match point {
Some(p) => {
block_count += 1;
if block_count == 1 {
start_point = p;
}
else if block_count == 10 {
end_point = p;
do_blockfetch(blockfetch_client, (start_point.clone(), end_point.clone())).await?;
block_count = 0;
}
},
None => {},
};
}
chainsync::NextResponse::RollBackward(x, _) => log::info!("rollback to {:?}", x),
chainsync::NextResponse::Await => log::info!("tip of chaing reached"),
chainsync::NextResponse::Await => tracing::info!("tip of chaing reached"),
};
}
}

async fn do_keepalive(keepalive_client: &mut keepalive::Client) -> Result<(), Error> {
let mut keepalive_timer = Instant::now();
loop {
if keepalive_timer.elapsed().as_secs() > 20 {
tracing::info!("sending keepalive...");
keepalive_client.send_keepalive().await?;
tracing::info!("keepalive sent");
keepalive_timer = Instant::now();
}
}
}

#[tokio::main]
async fn main() {
tracing::subscriber::set_global_default(
tracing_subscriber::FmtSubscriber::builder()
.with_max_level(tracing::Level::TRACE)
.with_max_level(tracing::Level::INFO)
.finish(),
)
.unwrap();

// setup a TCP socket to act as data bearer between our agents and the remote
// relay.
let mut peer = PeerClient::connect("relays-new.cardano-mainnet.iohk.io:3001", MAINNET_MAGIC)
.await
.unwrap();
loop {
// setup a TCP socket to act as data bearer between our agents and the remote
// relay.
let server = "backbone.cardano-mainnet.iohk.io:3001";
// let server = "localhost:6000";
let mut peer = PeerClient::connect(server, MAINNET_MAGIC)
.await
.unwrap();

// fetch an arbitrary batch of block
do_blockfetch(&mut peer).await;
let chainsync_handle = tokio::spawn(async move {
do_chainsync(&mut peer.chainsync, &mut peer.blockfetch).await?;
Ok::<(), Error>(())
}).fuse();
let keepalive_handle = tokio::spawn(async move {
do_keepalive(&mut peer.keepalive).await?;
Ok::<(), Error>(())
}).fuse();

// execute the chainsync flow from an arbitrary point in the chain
do_chainsync(&mut peer).await;
pin_mut!(chainsync_handle, keepalive_handle);

// If any of these concurrent tasks exit or fail, the others are canceled.
select! {
chainsync_result = chainsync_handle => {
match chainsync_result {
Ok(result) => {
match result {
Ok(_) => {}
Err(error) => {
tracing::error!("chainsync error: {:?}", error);
}
}
}
Err(error) => {
tracing::error!("chainsync error: {:?}", error);
}
}
}
keepalive_result = keepalive_handle => {
match keepalive_result {
Ok(result) => {
match result {
Ok(_) => {}
Err(error) => {
tracing::error!("keepalive error: {:?}", error);
}
}
}
Err(error) => {
tracing::error!("keepalive error: {:?}", error);
}
}
}
}
peer.plexer_handle.abort();

tracing::info!("waiting 10 seconds before reconnecting...");
tokio::time::sleep(tokio::time::Duration::from_secs(10)).await;
}
}
1 change: 1 addition & 0 deletions pallas-network/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ itertools = "0.10.5"
pallas-codec = { version = "=0.20.0", path = "../pallas-codec" }
pallas-crypto = { version = "=0.20.0", path = "../pallas-crypto" }
rand = "0.8.5"
socket2 = "0.5.5"
thiserror = "1.0.31"
tokio = { version = "1", features = ["rt", "net", "io-util", "time", "sync", "macros"] }
tracing = "0.1.37"
Expand Down
11 changes: 10 additions & 1 deletion pallas-network/src/multiplexer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,15 @@ const BUFFER_LEN: usize = 1024 * 10;
impl Bearer {
pub async fn connect_tcp(addr: impl ToSocketAddrs) -> Result<Self, tokio::io::Error> {
let stream = TcpStream::connect(addr).await?;
// add tcp_keepalive
let sock_ref = socket2::SockRef::from(&stream);
let mut tcp_keepalive = socket2::TcpKeepalive::new();
tcp_keepalive = tcp_keepalive.with_time(tokio::time::Duration::from_secs(20));
tcp_keepalive = tcp_keepalive.with_interval(tokio::time::Duration::from_secs(20));
let _ = sock_ref.set_tcp_keepalive(&tcp_keepalive);
// add tcp_nodelay
let _ = sock_ref.set_nodelay(true);

Ok(Self::Tcp(stream))
}

Expand Down Expand Up @@ -353,7 +362,7 @@ impl Plexer {
clock: Instant::now(),
bearer: SegmentBuffer::new(bearer),
ingress: tokio::sync::mpsc::channel(100), // TODO: define buffer
egress: tokio::sync::broadcast::channel(100),
egress: tokio::sync::broadcast::channel(100000),
}
}

Expand Down

0 comments on commit 3a16264

Please sign in to comment.