From 5ccf50ce37813e30b44df6e4b641ea6985eddcd5 Mon Sep 17 00:00:00 2001 From: Yiannis Marangos Date: Thu, 9 Nov 2023 16:08:36 +0200 Subject: [PATCH 01/11] feat(node): Implement sessions --- celestia/src/native.rs | 7 +- node/src/executor.rs | 5 +- node/src/header_ex.rs | 140 ++++++++++++++++++++++++++++------- node/src/header_ex/client.rs | 77 ++++++++++--------- node/src/header_ex/utils.rs | 2 +- node/src/lib.rs | 1 + node/src/p2p.rs | 12 +-- node/src/session.rs | 136 ++++++++++++++++++++++++++++++++++ 8 files changed, 301 insertions(+), 79 deletions(-) create mode 100644 node/src/session.rs diff --git a/celestia/src/native.rs b/celestia/src/native.rs index ca880c34..b682de41 100644 --- a/celestia/src/native.rs +++ b/celestia/src/native.rs @@ -57,14 +57,17 @@ pub async fn run() -> Result<()> { let network_id = network_id(args.network).to_owned(); let genesis_hash = network_genesis(args.network)?; + info!("Initializing store"); + let store = if let Some(db_path) = args.store { SledStore::new_in_path(db_path).await? } else { SledStore::new(network_id.clone()).await? }; - if let Ok(store_height) = store.head_height().await { - info!("Initialised store with head height: {store_height}"); + match store.head_height().await { + Ok(height) => info!("Initialised store with head height: {height}"), + Err(_) => info!("Initialised new store"), } let node = Node::new(NodeConfig { diff --git a/node/src/executor.rs b/node/src/executor.rs index 5bb8bb1a..0a0bd4ee 100644 --- a/node/src/executor.rs +++ b/node/src/executor.rs @@ -107,13 +107,12 @@ mod imp { #[derive(Debug)] pub(crate) struct Elapsed; - #[allow(dead_code)] pub(crate) fn timeout(duration: Duration, future: F) -> Timeout where F: Future, { let millis = u32::try_from(duration.as_millis().max(1)).unwrap_or(u32::MAX); - let delay = TimeoutFuture::new(millis); + let delay = SendWrapper::new(TimeoutFuture::new(millis)); Timeout { value: future, @@ -134,7 +133,7 @@ mod imp { #[pin] value: T, #[pin] - delay: TimeoutFuture, + delay: SendWrapper, } impl Future for Timeout where diff --git a/node/src/header_ex.rs b/node/src/header_ex.rs index 867cadd2..7be2cc39 100644 --- a/node/src/header_ex.rs +++ b/node/src/header_ex.rs @@ -1,29 +1,30 @@ use std::io; use std::sync::Arc; use std::task::{Context, Poll}; -use tracing::warn; use async_trait::async_trait; use celestia_proto::p2p::pb::{HeaderRequest, HeaderResponse}; use celestia_types::ExtendedHeader; use futures::{AsyncRead, AsyncReadExt, AsyncWrite, AsyncWriteExt}; +use instant::{Duration, Instant}; use libp2p::{ core::Endpoint, request_response::{self, Codec, InboundFailure, OutboundFailure, ProtocolSupport}, swarm::{ - ConnectionDenied, ConnectionId, FromSwarm, NetworkBehaviour, THandlerInEvent, + handler::ConnectionEvent, ConnectionDenied, ConnectionHandler, ConnectionHandlerEvent, + ConnectionId, FromSwarm, NetworkBehaviour, SubstreamProtocol, THandlerInEvent, THandlerOutEvent, ToSwarm, }, Multiaddr, PeerId, StreamProtocol, }; use prost::Message; -use tracing::debug; -use tracing::instrument; +use tracing::{debug, instrument, warn}; mod client; mod server; -mod utils; +pub(crate) mod utils; +use crate::executor::timeout; use crate::header_ex::client::HeaderExClientHandler; use crate::header_ex::server::HeaderExServerHandler; use crate::p2p::P2pError; @@ -33,14 +34,21 @@ use crate::utils::{protocol_id, OneshotResultSender}; /// Max request size in bytes const REQUEST_SIZE_MAXIMUM: usize = 1024; +/// Time limit on reading/writing a request +const REQUEST_TIME_LIMIT: Duration = Duration::from_secs(1); /// Max response size in bytes const RESPONSE_SIZE_MAXIMUM: usize = 10 * 1024 * 1024; +/// Time limit on reading/writing a response +const RESPONSE_TIME_LIMIT: Duration = Duration::from_secs(5); +/// Substream negotiation timeout +const NEGOTIATION_TIMEOUT: Duration = Duration::from_secs(1); type RequestType = HeaderRequest; type ResponseType = Vec; type ReqRespBehaviour = request_response::Behaviour; type ReqRespEvent = request_response::Event; type ReqRespMessage = request_response::Message; +type ReqRespConnectionHandler = ::ConnectionHandler; pub(crate) struct HeaderExBehaviour where @@ -176,7 +184,7 @@ impl NetworkBehaviour for HeaderExBehaviour where S: Store + 'static, { - type ConnectionHandler = ::ConnectionHandler; + type ConnectionHandler = ConnHandler; type ToSwarm = (); fn handle_established_inbound_connection( @@ -186,12 +194,9 @@ where local_addr: &Multiaddr, remote_addr: &Multiaddr, ) -> Result { - self.req_resp.handle_established_inbound_connection( - connection_id, - peer, - local_addr, - remote_addr, - ) + self.req_resp + .handle_established_inbound_connection(connection_id, peer, local_addr, remote_addr) + .map(|handler| ConnHandler(handler)) } fn handle_established_outbound_connection( @@ -201,12 +206,9 @@ where addr: &Multiaddr, role_override: Endpoint, ) -> Result { - self.req_resp.handle_established_outbound_connection( - connection_id, - peer, - addr, - role_override, - ) + self.req_resp + .handle_established_outbound_connection(connection_id, peer, addr, role_override) + .map(|handler| ConnHandler(handler)) } fn handle_pending_inbound_connection( @@ -274,6 +276,62 @@ where } } +pub(crate) struct ConnHandler(ReqRespConnectionHandler); + +impl ConnectionHandler for ConnHandler { + type ToBehaviour = ::ToBehaviour; + type FromBehaviour = ::FromBehaviour; + type InboundProtocol = ::InboundProtocol; + type InboundOpenInfo = ::InboundOpenInfo; + type OutboundProtocol = ::OutboundProtocol; + type OutboundOpenInfo = ::OutboundOpenInfo; + + fn listen_protocol(&self) -> SubstreamProtocol { + self.0.listen_protocol().with_timeout(NEGOTIATION_TIMEOUT) + } + + fn poll( + &mut self, + cx: &mut Context<'_>, + ) -> Poll< + ConnectionHandlerEvent, + > { + match self.0.poll(cx) { + Poll::Ready(ConnectionHandlerEvent::OutboundSubstreamRequest { protocol }) => { + Poll::Ready(ConnectionHandlerEvent::OutboundSubstreamRequest { + protocol: protocol.with_timeout(NEGOTIATION_TIMEOUT), + }) + } + ev => ev, + } + } + + fn on_behaviour_event(&mut self, event: Self::FromBehaviour) { + self.0.on_behaviour_event(event) + } + + fn on_connection_event( + &mut self, + event: ConnectionEvent< + '_, + Self::InboundProtocol, + Self::OutboundProtocol, + Self::InboundOpenInfo, + Self::OutboundOpenInfo, + >, + ) { + self.0.on_connection_event(event) + } + + fn connection_keep_alive(&self) -> bool { + self.0.connection_keep_alive() + } + + fn poll_close(&mut self, cx: &mut Context<'_>) -> Poll> { + self.0.poll_close(cx) + } +} + #[derive(Clone, Copy, Debug, Default)] pub(crate) struct HeaderCodec; @@ -287,14 +345,19 @@ impl Codec for HeaderCodec { where T: AsyncRead + Unpin + Send, { - let data = read_up_to(io, REQUEST_SIZE_MAXIMUM).await?; + let data = read_up_to(io, REQUEST_SIZE_MAXIMUM, REQUEST_TIME_LIMIT).await?; if data.len() >= REQUEST_SIZE_MAXIMUM { debug!("Message filled the whole buffer (len: {})", data.len()); } - parse_header_request(&data) - .ok_or_else(|| io::Error::new(io::ErrorKind::Other, "invalid request")) + parse_header_request(&data).ok_or_else(|| { + // We have two cases that can reach here: + // + // 1. The request is invalid + // 2. The request is incomplete because of the size limit or time limit + io::Error::new(io::ErrorKind::Other, "invalid or incomplete request") + }) } async fn read_response( @@ -305,7 +368,7 @@ impl Codec for HeaderCodec { where T: AsyncRead + Unpin + Send, { - let data = read_up_to(io, RESPONSE_SIZE_MAXIMUM).await?; + let data = read_up_to(io, RESPONSE_SIZE_MAXIMUM, RESPONSE_TIME_LIMIT).await?; if data.len() >= RESPONSE_SIZE_MAXIMUM { debug!("Message filled the whole buffer (len: {})", data.len()); @@ -320,7 +383,14 @@ impl Codec for HeaderCodec { } if msgs.is_empty() { - return Err(io::Error::new(io::ErrorKind::Other, "invalid response")); + // We have two cases that can reach here: + // + // 1. The response is invalid + // 2. The response is incomplete because of the size limit or time limit + return Err(io::Error::new( + io::ErrorKind::Other, + "invalid or incomplete response", + )); } Ok(msgs) @@ -339,7 +409,9 @@ impl Codec for HeaderCodec { let _ = req.encode_length_delimited(&mut buf); - io.write_all(&buf).await?; + timeout(REQUEST_TIME_LIMIT, io.write_all(&buf)) + .await + .map_err(|_| io::Error::new(io::ErrorKind::Other, "writing request timed out"))??; Ok(()) } @@ -364,18 +436,22 @@ impl Codec for HeaderCodec { } } - io.write_all(&buf).await?; + timeout(RESPONSE_TIME_LIMIT, io.write_all(&buf)) + .await + .map_err(|_| io::Error::new(io::ErrorKind::Other, "writing response timed out"))??; Ok(()) } } -async fn read_up_to(io: &mut T, limit: usize) -> io::Result> +/// Reads up to `size_limit` within `time_limit`. +async fn read_up_to(io: &mut T, size_limit: usize, time_limit: Duration) -> io::Result> where T: AsyncRead + Unpin + Send, { - let mut buf = vec![0u8; limit]; + let mut buf = vec![0u8; size_limit]; let mut read_len = 0; + let now = Instant::now(); loop { if read_len == buf.len() { @@ -383,7 +459,15 @@ where break; } - let len = io.read(&mut buf[read_len..]).await?; + let Some(time_limit) = time_limit.checked_sub(now.elapsed()) else { + break; + }; + + let len = match timeout(time_limit, io.read(&mut buf[read_len..])).await { + Ok(Ok(len)) => len, + Ok(Err(e)) => return Err(e), + Err(_) => break, + }; if len == 0 { // EOF diff --git a/node/src/header_ex/client.rs b/node/src/header_ex/client.rs index a6de5893..00c9af4a 100644 --- a/node/src/header_ex/client.rs +++ b/node/src/header_ex/client.rs @@ -4,18 +4,15 @@ use std::fmt::Debug; use std::hash::Hash; use std::sync::Arc; use std::task::{Context, Poll}; -use std::time::Duration; use celestia_proto::p2p::pb::header_request::Data; use celestia_proto::p2p::pb::{HeaderRequest, HeaderResponse}; use celestia_types::ExtendedHeader; use futures::future::join_all; -use instant::Instant; use libp2p::request_response::{OutboundFailure, OutboundRequestId}; use libp2p::PeerId; -use smallvec::SmallVec; use tokio::sync::oneshot; -use tracing::{instrument, trace, warn}; +use tracing::{debug, instrument, trace}; use crate::executor::{spawn, yield_now}; use crate::header_ex::utils::{HeaderRequestExt, HeaderResponseExt}; @@ -25,7 +22,6 @@ use crate::peer_tracker::PeerTracker; use crate::utils::{OneshotResultSender, OneshotResultSenderExt, VALIDATIONS_PER_YIELD}; const MAX_PEERS: usize = 10; -const TIMEOUT: Duration = Duration::from_secs(10); pub(super) struct HeaderExClientHandler where @@ -38,7 +34,6 @@ where struct State { request: HeaderRequest, respond_to: OneshotResultSender, P2pError>, - started_at: Instant, } pub(super) trait RequestSender { @@ -108,7 +103,6 @@ where let state = State { request, respond_to, - started_at: Instant::now(), }; self.reqs.insert(req_id, state); @@ -139,7 +133,6 @@ where let state = State { request: request.clone(), respond_to: tx, - started_at: Instant::now(), }; self.reqs.insert(req_id, state); @@ -220,14 +213,14 @@ where }); } - #[instrument(level = "trace", skip(self))] + #[instrument(level = "debug", skip(self))] pub(super) fn on_failure( &mut self, peer: PeerId, request_id: S::RequestId, error: OutboundFailure, ) { - trace!("Outbound failure"); + debug!("Outbound failure"); if let Some(state) = self.reqs.remove(&request_id) { state @@ -236,31 +229,7 @@ where } } - #[instrument(skip_all)] - fn prune_expired_requests(&mut self) { - let mut expired_reqs = SmallVec::<[_; 32]>::new(); - - for (req_id, state) in self.reqs.iter() { - if state.started_at.elapsed() >= TIMEOUT { - expired_reqs.push(*req_id); - } - } - - if !expired_reqs.is_empty() { - warn!("{} requests timed out", expired_reqs.len()); - } - - for req_id in expired_reqs { - if let Some(state) = self.reqs.remove(&req_id) { - state - .respond_to - .maybe_send_err(HeaderExError::OutboundFailure(OutboundFailure::Timeout)); - } - } - } - pub(super) fn poll(&mut self, _cx: &mut Context) -> Poll<()> { - self.prune_expired_requests(); Poll::Pending } } @@ -282,10 +251,16 @@ async fn decode_and_verify_responses( let mut headers = Vec::with_capacity(responses.len()); - for responses in responses.chunks(VALIDATIONS_PER_YIELD) { + 'outer: for responses in responses.chunks(VALIDATIONS_PER_YIELD) { for response in responses { - // Unmarshal and validate - let header = response.to_extended_header()?; + // Unmarshal and validate. Propagate error only if nothing + // was decoded before. + let header = match response.to_extended_header() { + Ok(header) => header, + Err(e) if headers.is_empty() => return Err(e), + Err(_) => break 'outer, + }; + trace!("Header: {header:?}"); headers.push(header); } @@ -443,6 +418,34 @@ mod tests { assert_eq!(result, expected_headers); } + #[async_test] + async fn request_range_responds_with_invalid_headaer_in_the_middle() { + let peer_tracker = peer_tracker_with_n_peers(15); + let mut mock_req = MockReq::new(); + let mut handler = HeaderExClientHandler::::new(peer_tracker); + + let (tx, rx) = oneshot::channel(); + + handler.on_send_request(&mut mock_req, HeaderRequest::with_origin(5, 5), tx); + + let mut gen = ExtendedHeaderGenerator::new_from_height(5); + let mut headers = gen.next_many(5); + + invalidate(&mut headers[2]); + let expected_headers = &headers[..2]; + + let responses = headers + .iter() + .map(|header| header.to_header_response()) + .collect::>(); + + mock_req.send_n_responses(&mut handler, 1, responses); + + let result = rx.await.unwrap().unwrap(); + assert_eq!(result.len(), 2); + assert_eq!(result, expected_headers); + } + #[async_test] async fn request_range_responds_with_not_found() { let peer_tracker = peer_tracker_with_n_peers(15); diff --git a/node/src/header_ex/utils.rs b/node/src/header_ex/utils.rs index 3e5fae1a..b121c7b2 100644 --- a/node/src/header_ex/utils.rs +++ b/node/src/header_ex/utils.rs @@ -7,7 +7,7 @@ use tendermint_proto::Protobuf; use crate::header_ex::HeaderExError; -pub(super) trait HeaderRequestExt { +pub(crate) trait HeaderRequestExt { fn with_origin(origin: u64, amount: u64) -> HeaderRequest; fn with_hash(hash: Hash) -> HeaderRequest; fn head_request() -> HeaderRequest; diff --git a/node/src/lib.rs b/node/src/lib.rs index 49fc710b..c888865e 100644 --- a/node/src/lib.rs +++ b/node/src/lib.rs @@ -3,6 +3,7 @@ mod header_ex; pub mod node; pub mod p2p; pub mod peer_tracker; +mod session; pub mod store; mod swarm; pub mod syncer; diff --git a/node/src/p2p.rs b/node/src/p2p.rs index 261f14c7..d18331fa 100644 --- a/node/src/p2p.rs +++ b/node/src/p2p.rs @@ -23,11 +23,11 @@ use tokio::select; use tokio::sync::{mpsc, oneshot, watch}; use tracing::{debug, info, instrument, trace, warn}; -use crate::executor::spawn; -use crate::executor::Interval; +use crate::executor::{spawn, Interval}; use crate::header_ex::{HeaderExBehaviour, HeaderExConfig}; use crate::peer_tracker::PeerTracker; use crate::peer_tracker::PeerTrackerInfo; +use crate::session::Session; use crate::store::Store; use crate::swarm::new_swarm; use crate::utils::{ @@ -288,12 +288,8 @@ where let height = from.height().value() + 1; - let headers = self - .header_ex_request(HeaderRequest { - data: Some(header_request::Data::Origin(height)), - amount, - }) - .await?; + let mut session = Session::new(height, amount, self.cmd_tx.clone())?; + let headers = session.run().await?; from.verify_adjacent_range(&headers) .map_err(|_| HeaderExError::InvalidResponse)?; diff --git a/node/src/session.rs b/node/src/session.rs new file mode 100644 index 00000000..568ced1e --- /dev/null +++ b/node/src/session.rs @@ -0,0 +1,136 @@ +use celestia_proto::p2p::pb::HeaderRequest; +use celestia_types::ExtendedHeader; +use tokio::sync::{mpsc, oneshot}; +use tracing::debug; + +use crate::executor::spawn; +use crate::header_ex::utils::HeaderRequestExt; +use crate::p2p::{P2pCmd, P2pError}; + +const MAX_AMOUNT_PER_REQ: u64 = 64; +const MAX_CONCURRENT_REQS: usize = 8; + +type Result = std::result::Result; + +pub(crate) struct Session { + next_height: u64, + remaining_amount: u64, + cmd_tx: mpsc::Sender, + response_tx: mpsc::Sender<(u64, u64, Result>)>, + response_rx: mpsc::Receiver<(u64, u64, Result>)>, + ongoing: usize, +} + +impl Session { + pub(crate) fn new(from_height: u64, amount: u64, cmd_tx: mpsc::Sender) -> Result { + if from_height < 1 || amount < 1 { + todo!(); + } + + let (response_tx, response_rx) = mpsc::channel(8); + + Ok(Session { + next_height: from_height, + remaining_amount: amount, + cmd_tx, + response_tx, + response_rx, + ongoing: 0, + }) + } + + pub(crate) async fn run(&mut self) -> Result> { + let mut responses = Vec::new(); + + for _ in 0..MAX_CONCURRENT_REQS { + if self.remaining_amount == 0 { + break; + } + + self.send_next_request().await?; + } + + while self.ongoing > 0 { + let (height, requested_amount, res) = self.recv_response().await; + + match res { + Ok(headers) => { + let headers_len = headers.len() as u64; + + responses.push(headers); + + // Reschedule the missing sub-range + if headers_len < requested_amount { + let height = height + headers_len; + let amount = requested_amount - headers_len; + self.send_request(height, amount).await?; + } + } + Err(P2pError::HeaderEx(e)) => { + debug!("HeaderEx error: {e}"); + self.send_request(height, requested_amount).await?; + } + Err(e) => return Err(e), + } + } + + let mut headers = responses + .into_iter() + .flat_map(|headers| headers.into_iter()) + .collect::>(); + + headers.sort_unstable_by_key(|header| header.height().value()); + + Ok(headers) + } + + async fn recv_response(&mut self) -> (u64, u64, Result>) { + let (height, requested_amount, res) = + self.response_rx.recv().await.expect("channel never closes"); + self.ongoing -= 1; + (height, requested_amount, res) + } + + pub(crate) async fn send_next_request(&mut self) -> Result<()> { + if self.remaining_amount == 0 { + return Ok(()); + } + + let amount = self.remaining_amount.min(MAX_AMOUNT_PER_REQ); + self.send_request(self.next_height, amount).await?; + + self.next_height += amount; + self.remaining_amount -= amount; + + Ok(()) + } + + pub(crate) async fn send_request(&mut self, height: u64, amount: u64) -> Result<()> { + debug!("Fetching batch {} until {}", height, height + amount - 1); + + let request = HeaderRequest::with_origin(height, amount); + let (tx, rx) = oneshot::channel(); + + self.cmd_tx + .send(P2pCmd::HeaderExRequest { + request, + respond_to: tx, + }) + .await + .map_err(|_| P2pError::WorkerDied)?; + + let response_tx = self.response_tx.clone(); + + spawn(async move { + let result = match rx.await { + Ok(result) => result, + Err(_) => Err(P2pError::WorkerDied), + }; + let _ = response_tx.send((height, amount, result)).await; + }); + + self.ongoing += 1; + + Ok(()) + } +} From a87c213fe3ea96295198106c19e06c920ef71b97 Mon Sep 17 00:00:00 2001 From: Yiannis Marangos Date: Fri, 10 Nov 2023 16:26:03 +0200 Subject: [PATCH 02/11] fix clippy --- node/src/header_ex.rs | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/node/src/header_ex.rs b/node/src/header_ex.rs index 7be2cc39..03c3be62 100644 --- a/node/src/header_ex.rs +++ b/node/src/header_ex.rs @@ -196,7 +196,7 @@ where ) -> Result { self.req_resp .handle_established_inbound_connection(connection_id, peer, local_addr, remote_addr) - .map(|handler| ConnHandler(handler)) + .map(ConnHandler) } fn handle_established_outbound_connection( @@ -208,7 +208,7 @@ where ) -> Result { self.req_resp .handle_established_outbound_connection(connection_id, peer, addr, role_override) - .map(|handler| ConnHandler(handler)) + .map(ConnHandler) } fn handle_pending_inbound_connection( From 9fda990071d57598c981f19567f60fdc1d3755bf Mon Sep 17 00:00:00 2001 From: Yiannis Marangos Date: Mon, 13 Nov 2023 13:26:48 +0200 Subject: [PATCH 03/11] fix tests --- node/src/syncer.rs | 95 ++++++++++++++++++++++++++++++++--------- node/tests/header_ex.rs | 29 ++++++------- 2 files changed, 87 insertions(+), 37 deletions(-) diff --git a/node/src/syncer.rs b/node/src/syncer.rs index 27b575c7..043d6308 100644 --- a/node/src/syncer.rs +++ b/node/src/syncer.rs @@ -612,13 +612,21 @@ mod tests { assert_syncing(&syncer, &store, 30, 1058).await; // Syncer requested the first batch ([31, 542]) - let (height, amount, respond_to) = p2p_mock.expect_header_request_for_height_cmd().await; - assert_eq!(height, 31); - assert_eq!(amount, 512); - respond_to - .send(Ok(headers.drain(..512).collect())) - .map_err(|_| "headers [31, 542]") - .unwrap(); + handle_session_batch( + &mut p2p_mock, + &mut headers, + vec![ + (31, 64), + (95, 64), + (159, 64), + (223, 64), + (287, 64), + (351, 64), + (415, 64), + (479, 64), + ], + ) + .await; assert_syncing(&syncer, &store, 542, 1058).await; // Still syncing, but new HEAD arrived (height 1059) @@ -627,13 +635,21 @@ mod tests { assert_syncing(&syncer, &store, 542, 1059).await; // Syncer requested the second batch ([543, 1054]) - let (height, amount, respond_to) = p2p_mock.expect_header_request_for_height_cmd().await; - assert_eq!(height, 543); - assert_eq!(amount, 512); - respond_to - .send(Ok(headers.drain(..512).collect())) - .map_err(|_| "headers [543, 1054]") - .unwrap(); + handle_session_batch( + &mut p2p_mock, + &mut headers, + vec![ + (543, 64), + (607, 64), + (671, 64), + (735, 64), + (799, 64), + (863, 64), + (927, 64), + (991, 64), + ], + ) + .await; assert_syncing(&syncer, &store, 1054, 1059).await; // Syncer requested the last batch ([1055, 1059]) @@ -682,13 +698,21 @@ mod tests { assert_syncing(&syncer, &store, 25, 545).await; // Syncer requested the first batch ([26, 537]) - let (height, amount, respond_to) = p2p_mock.expect_header_request_for_height_cmd().await; - assert_eq!(height, 26); - assert_eq!(amount, 512); - respond_to - .send(Ok(headers.drain(..512).collect())) - .map_err(|_| "headers [26, 537]") - .unwrap(); + handle_session_batch( + &mut p2p_mock, + &mut headers, + vec![ + (26, 64), + (90, 64), + (154, 64), + (218, 64), + (282, 64), + (346, 64), + (410, 64), + (474, 64), + ], + ) + .await; assert_syncing(&syncer, &store, 537, 545).await; // Syncer requested the last batch ([538, 545]) @@ -826,4 +850,33 @@ mod tests { (syncer, store, handle) } + + async fn handle_session_batch( + p2p_mock: &mut MockP2pHandle, + remaining_headers: &mut Vec, + mut requests: Vec<(u64, u64)>, + ) { + let first_start = requests[0].0; + + for _ in 0..8 { + let (height, amount, respond_to) = + p2p_mock.expect_header_request_for_height_cmd().await; + + let pos = requests + .iter() + .position(|x| *x == (height, amount)) + .expect("invalid request"); + requests.remove(pos); + + let start = (height - first_start) as usize; + let end = (height - first_start + amount) as usize; + respond_to + .send(Ok(remaining_headers[start..end].to_vec())) + .map_err(|_| format!("headers [{}, {}]", height, height + amount - 1)) + .unwrap(); + } + + // Remove already sent batch + remaining_headers.drain(..512); + } } diff --git a/node/tests/header_ex.rs b/node/tests/header_ex.rs index 97c6f876..a0fdb5f5 100644 --- a/node/tests/header_ex.rs +++ b/node/tests/header_ex.rs @@ -9,7 +9,7 @@ use celestia_node::{ test_utils::{gen_filled_store, listening_test_node_config, test_node_config}, }; use celestia_types::test_utils::{invalidate, unverify}; -use tokio::time::sleep; +use tokio::time::{sleep, timeout}; use crate::utils::new_connected_node; @@ -112,13 +112,12 @@ async fn client_server() { assert_eq!(server_headers[1..], received_all_headers); // reqest more headers than available in store - // TODO: this reflects _current_ behaviour. once sessions are implemented it'll keep retrying - // and this test will need to be changed - let partial_response = client - .request_verified_headers(&received_genesis, 20) - .await - .unwrap(); - assert_eq!(partial_response.len(), 19); + timeout( + Duration::from_millis(200), + client.request_verified_headers(&received_genesis, 20), + ) + .await + .expect_err("sessions keep retrying until all headers are received"); // request unknown hash let unstored_header = header_generator.next_of(&server_headers[0]); @@ -342,14 +341,12 @@ async fn invalidated_header_server_store() { client.wait_connected().await.unwrap(); - let invalidated_header_in_range = client - .request_verified_headers(&server_headers[9], 5) - .await - .unwrap_err(); - assert!(matches!( - invalidated_header_in_range, - NodeError::P2p(P2pError::HeaderEx(HeaderExError::InvalidResponse)) - )); + timeout( + Duration::from_millis(200), + client.request_verified_headers(&server_headers[9], 5), + ) + .await + .expect_err("session never stops retrying on invalid header"); let requested_from_invalidated_header = client .request_verified_headers(&server_headers[10], 3) From 84198fdf45062c7836beeca2bfbe57ca360c2c07 Mon Sep 17 00:00:00 2001 From: Yiannis Marangos Date: Mon, 13 Nov 2023 13:37:09 +0200 Subject: [PATCH 04/11] return error on invalid parameters --- node/src/session.rs | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/node/src/session.rs b/node/src/session.rs index 568ced1e..00c5a085 100644 --- a/node/src/session.rs +++ b/node/src/session.rs @@ -5,7 +5,7 @@ use tracing::debug; use crate::executor::spawn; use crate::header_ex::utils::HeaderRequestExt; -use crate::p2p::{P2pCmd, P2pError}; +use crate::p2p::{HeaderExError, P2pCmd, P2pError}; const MAX_AMOUNT_PER_REQ: u64 = 64; const MAX_CONCURRENT_REQS: usize = 8; @@ -24,7 +24,7 @@ pub(crate) struct Session { impl Session { pub(crate) fn new(from_height: u64, amount: u64, cmd_tx: mpsc::Sender) -> Result { if from_height < 1 || amount < 1 { - todo!(); + return Err(P2pError::HeaderEx(HeaderExError::InvalidRequest)); } let (response_tx, response_rx) = mpsc::channel(8); @@ -87,7 +87,9 @@ impl Session { async fn recv_response(&mut self) -> (u64, u64, Result>) { let (height, requested_amount, res) = self.response_rx.recv().await.expect("channel never closes"); + self.ongoing -= 1; + (height, requested_amount, res) } From 7875c6d8a3ff51b5c86d5464b5bf425ac2af40fb Mon Sep 17 00:00:00 2001 From: Yiannis Marangos Date: Mon, 13 Nov 2023 16:25:52 +0200 Subject: [PATCH 05/11] add tests --- node/src/p2p.rs | 3 +- node/src/session.rs | 139 ++++++++++++++++++++++++++++++++++++++++- node/src/test_utils.rs | 2 + 3 files changed, 142 insertions(+), 2 deletions(-) diff --git a/node/src/p2p.rs b/node/src/p2p.rs index 4694aa0e..bbaeec53 100644 --- a/node/src/p2p.rs +++ b/node/src/p2p.rs @@ -162,7 +162,7 @@ where let (peer_tracker_tx, peer_tracker_rx) = watch::channel(PeerTrackerInfo::default()); let p2p = P2p { - cmd_tx, + cmd_tx: cmd_tx.clone(), header_sub_watcher: header_sub_rx, peer_tracker_info_watcher: peer_tracker_rx, local_peer_id: PeerId::random(), @@ -170,6 +170,7 @@ where }; let handle = crate::test_utils::MockP2pHandle { + cmd_tx, cmd_rx, header_sub_tx, peer_tracker_tx, diff --git a/node/src/session.rs b/node/src/session.rs index 00c5a085..eb5157df 100644 --- a/node/src/session.rs +++ b/node/src/session.rs @@ -59,11 +59,14 @@ impl Session { responses.push(headers); - // Reschedule the missing sub-range if headers_len < requested_amount { + // Reschedule the missing sub-range let height = height + headers_len; let amount = requested_amount - headers_len; self.send_request(height, amount).await?; + } else { + // Schedule next request + self.send_next_request().await?; } } Err(P2pError::HeaderEx(e)) => { @@ -136,3 +139,137 @@ impl Session { Ok(()) } } + +#[cfg(test)] +mod tests { + use super::*; + use crate::executor::spawn; + use crate::p2p::P2p; + use crate::store::InMemoryStore; + use celestia_types::test_utils::ExtendedHeaderGenerator; + + #[cfg(not(target_arch = "wasm32"))] + use tokio::test as async_test; + #[cfg(target_arch = "wasm32")] + use wasm_bindgen_test::wasm_bindgen_test as async_test; + + #[async_test] + async fn retry_on_missing_range() { + let (_p2p, mut p2p_mock) = P2p::::mocked(); + let mut gen = ExtendedHeaderGenerator::new(); + let headers = gen.next_many(64); + + let mut session = Session::new(1, 64, p2p_mock.cmd_tx.clone()).unwrap(); + let (result_tx, result_rx) = oneshot::channel(); + spawn(async move { + let res = session.run().await; + result_tx.send(res).unwrap(); + }); + + let (height, amount, respond_to) = p2p_mock.expect_header_request_for_height_cmd().await; + assert_eq!(height, 1); + assert_eq!(amount, 64); + respond_to.send(Ok(headers[..60].to_vec())).unwrap(); + + let (height, amount, respond_to) = p2p_mock.expect_header_request_for_height_cmd().await; + assert_eq!(height, 61); + assert_eq!(amount, 4); + respond_to.send(Ok(headers[60..64].to_vec())).unwrap(); + + p2p_mock.expect_no_cmd().await; + + let received_headers = result_rx.await.unwrap().unwrap(); + assert_eq!(headers, received_headers); + } + + #[async_test] + async fn nine_batches() { + let (_p2p, mut p2p_mock) = P2p::::mocked(); + let mut gen = ExtendedHeaderGenerator::new(); + let headers = gen.next_many(520); + + let mut session = Session::new(1, 520, p2p_mock.cmd_tx.clone()).unwrap(); + let (result_tx, result_rx) = oneshot::channel(); + spawn(async move { + let res = session.run().await; + result_tx.send(res).unwrap(); + }); + + for i in 0..8 { + let (height, amount, respond_to) = + p2p_mock.expect_header_request_for_height_cmd().await; + assert_eq!(height, 1 + 64 * i); + assert_eq!(amount, 64); + let start = (height - 1) as usize; + let end = start + amount as usize; + respond_to.send(Ok(headers[start..end].to_vec())).unwrap(); + } + + let (height, amount, respond_to) = p2p_mock.expect_header_request_for_height_cmd().await; + assert_eq!(height, 513); + assert_eq!(amount, 8); + let start = (height - 1) as usize; + let end = start + amount as usize; + respond_to.send(Ok(headers[start..end].to_vec())).unwrap(); + + p2p_mock.expect_no_cmd().await; + + let received_headers = result_rx.await.unwrap().unwrap(); + assert_eq!(headers, received_headers); + } + + #[async_test] + async fn not_found_is_not_fatal() { + let (_p2p, mut p2p_mock) = P2p::::mocked(); + let mut gen = ExtendedHeaderGenerator::new(); + let headers = gen.next_many(64); + + let mut session = Session::new(1, 64, p2p_mock.cmd_tx.clone()).unwrap(); + let (result_tx, result_rx) = oneshot::channel(); + spawn(async move { + let res = session.run().await; + result_tx.send(res).unwrap(); + }); + + let (height, amount, respond_to) = p2p_mock.expect_header_request_for_height_cmd().await; + assert_eq!(height, 1); + assert_eq!(amount, 64); + respond_to + .send(Err(P2pError::HeaderEx(HeaderExError::HeaderNotFound))) + .unwrap(); + + let (height, amount, respond_to) = p2p_mock.expect_header_request_for_height_cmd().await; + assert_eq!(height, 1); + assert_eq!(amount, 64); + respond_to.send(Ok(headers.clone())).unwrap(); + + p2p_mock.expect_no_cmd().await; + + let received_headers = result_rx.await.unwrap().unwrap(); + assert_eq!(headers, received_headers); + } + + #[async_test] + async fn no_peers_is_fatal() { + let (_p2p, mut p2p_mock) = P2p::::mocked(); + + let mut session = Session::new(1, 64, p2p_mock.cmd_tx.clone()).unwrap(); + let (result_tx, result_rx) = oneshot::channel(); + spawn(async move { + let res = session.run().await; + result_tx.send(res).unwrap(); + }); + + let (height, amount, respond_to) = p2p_mock.expect_header_request_for_height_cmd().await; + assert_eq!(height, 1); + assert_eq!(amount, 64); + respond_to.send(Err(P2pError::NoConnectedPeers)).unwrap(); + + p2p_mock.expect_no_cmd().await; + + assert!(matches!( + result_rx.await, + Ok(Err(P2pError::NoConnectedPeers)) + )); + } +} diff --git a/node/src/test_utils.rs b/node/src/test_utils.rs index 7ec71b85..27e06666 100644 --- a/node/src/test_utils.rs +++ b/node/src/test_utils.rs @@ -56,6 +56,8 @@ pub fn test_node_config_with_keypair(keypair: Keypair) -> NodeConfig, pub(crate) cmd_rx: mpsc::Receiver, pub(crate) header_sub_tx: watch::Sender>, pub(crate) peer_tracker_tx: watch::Sender, From 3a939ac24b8b626ec793ffd295aafe967bec64c8 Mon Sep 17 00:00:00 2001 From: Yiannis Marangos Date: Tue, 14 Nov 2023 13:00:37 +0200 Subject: [PATCH 06/11] rename constants --- node/src/header_ex.rs | 24 ++++++++++++------------ 1 file changed, 12 insertions(+), 12 deletions(-) diff --git a/node/src/header_ex.rs b/node/src/header_ex.rs index 03c3be62..46ddf0c8 100644 --- a/node/src/header_ex.rs +++ b/node/src/header_ex.rs @@ -32,12 +32,12 @@ use crate::peer_tracker::PeerTracker; use crate::store::Store; use crate::utils::{protocol_id, OneshotResultSender}; -/// Max request size in bytes -const REQUEST_SIZE_MAXIMUM: usize = 1024; +/// Size limit of a request in bytes +const REQUEST_SIZE_LIMIT: usize = 1024; /// Time limit on reading/writing a request const REQUEST_TIME_LIMIT: Duration = Duration::from_secs(1); -/// Max response size in bytes -const RESPONSE_SIZE_MAXIMUM: usize = 10 * 1024 * 1024; +/// Size limit of a response in bytes +const RESPONSE_SIZE_LIMIT: usize = 10 * 1024 * 1024; /// Time limit on reading/writing a response const RESPONSE_TIME_LIMIT: Duration = Duration::from_secs(5); /// Substream negotiation timeout @@ -345,9 +345,9 @@ impl Codec for HeaderCodec { where T: AsyncRead + Unpin + Send, { - let data = read_up_to(io, REQUEST_SIZE_MAXIMUM, REQUEST_TIME_LIMIT).await?; + let data = read_up_to(io, REQUEST_SIZE_LIMIT, REQUEST_TIME_LIMIT).await?; - if data.len() >= REQUEST_SIZE_MAXIMUM { + if data.len() >= REQUEST_SIZE_LIMIT { debug!("Message filled the whole buffer (len: {})", data.len()); } @@ -368,9 +368,9 @@ impl Codec for HeaderCodec { where T: AsyncRead + Unpin + Send, { - let data = read_up_to(io, RESPONSE_SIZE_MAXIMUM, RESPONSE_TIME_LIMIT).await?; + let data = read_up_to(io, RESPONSE_SIZE_LIMIT, RESPONSE_TIME_LIMIT).await?; - if data.len() >= RESPONSE_SIZE_MAXIMUM { + if data.len() >= RESPONSE_SIZE_LIMIT { debug!("Message filled the whole buffer (len: {})", data.len()); } @@ -405,7 +405,7 @@ impl Codec for HeaderCodec { where T: AsyncWrite + Unpin + Send, { - let mut buf = Vec::with_capacity(REQUEST_SIZE_MAXIMUM); + let mut buf = Vec::with_capacity(REQUEST_SIZE_LIMIT); let _ = req.encode_length_delimited(&mut buf); @@ -425,7 +425,7 @@ impl Codec for HeaderCodec { where T: AsyncWrite + Unpin + Send, { - let mut buf = Vec::with_capacity(RESPONSE_SIZE_MAXIMUM); + let mut buf = Vec::with_capacity(RESPONSE_SIZE_LIMIT); for resp in resps { if resp.encode_length_delimited(&mut buf).is_err() { @@ -595,7 +595,7 @@ mod tests { #[async_test] async fn test_decode_header_request_too_large() { - let too_long_message_len = REQUEST_SIZE_MAXIMUM + 1; + let too_long_message_len = REQUEST_SIZE_LIMIT + 1; let mut length_delimiter_buffer = BytesMut::new(); prost::encode_length_delimiter(too_long_message_len, &mut length_delimiter_buffer).unwrap(); let mut reader = Cursor::new(length_delimiter_buffer); @@ -613,7 +613,7 @@ mod tests { #[async_test] async fn test_decode_header_response_too_large() { - let too_long_message_len = RESPONSE_SIZE_MAXIMUM + 1; + let too_long_message_len = RESPONSE_SIZE_LIMIT + 1; let mut length_delimiter_buffer = BytesMut::new(); encode_length_delimiter(too_long_message_len, &mut length_delimiter_buffer).unwrap(); let mut reader = Cursor::new(length_delimiter_buffer); From b63c89b4cc13f4dcbdde5d80e0a6cb5907d81d1a Mon Sep 17 00:00:00 2001 From: Yiannis Marangos Date: Tue, 14 Nov 2023 14:42:06 +0200 Subject: [PATCH 07/11] Update node/src/session.rs MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Co-authored-by: Maciej Zwoliński Signed-off-by: Yiannis Marangos --- node/src/session.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/node/src/session.rs b/node/src/session.rs index eb5157df..fa09d422 100644 --- a/node/src/session.rs +++ b/node/src/session.rs @@ -79,7 +79,7 @@ impl Session { let mut headers = responses .into_iter() - .flat_map(|headers| headers.into_iter()) + .flatten() .collect::>(); headers.sort_unstable_by_key(|header| header.height().value()); From 442402fa7fca88755e6de37763b390e4c253613b Mon Sep 17 00:00:00 2001 From: Yiannis Marangos Date: Tue, 14 Nov 2023 14:43:07 +0200 Subject: [PATCH 08/11] Update node/src/session.rs MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Co-authored-by: Maciej Zwoliński Signed-off-by: Yiannis Marangos --- node/src/session.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/node/src/session.rs b/node/src/session.rs index fa09d422..004f46eb 100644 --- a/node/src/session.rs +++ b/node/src/session.rs @@ -27,7 +27,7 @@ impl Session { return Err(P2pError::HeaderEx(HeaderExError::InvalidRequest)); } - let (response_tx, response_rx) = mpsc::channel(8); + let (response_tx, response_rx) = mpsc::channel(MAX_CONCURRENT_REQS); Ok(Session { next_height: from_height, From 6fb16fa00ae1a77d6e9a3460777078d83f40361a Mon Sep 17 00:00:00 2001 From: Yiannis Marangos Date: Tue, 14 Nov 2023 15:10:45 +0200 Subject: [PATCH 09/11] fix batch handling in tests --- node/src/syncer.rs | 7 +++++-- 1 file changed, 5 insertions(+), 2 deletions(-) diff --git a/node/src/syncer.rs b/node/src/syncer.rs index 4c91bced..fe71149d 100644 --- a/node/src/syncer.rs +++ b/node/src/syncer.rs @@ -858,8 +858,9 @@ mod tests { mut requests: Vec<(u64, u64)>, ) { let first_start = requests[0].0; + let mut sum_amount = 0; - for _ in 0..8 { + for _ in 0..requests.len() { let (height, amount, respond_to) = p2p_mock.expect_header_request_for_height_cmd().await; @@ -875,9 +876,11 @@ mod tests { .send(Ok(remaining_headers[start..end].to_vec())) .map_err(|_| format!("headers [{}, {}]", height, height + amount - 1)) .unwrap(); + + sum_amount += amount; } // Remove already sent batch - remaining_headers.drain(..512); + remaining_headers.drain(..sum_amount as usize); } } From d183e743126fffb5345539e84d929905796714c7 Mon Sep 17 00:00:00 2001 From: Yiannis Marangos Date: Tue, 14 Nov 2023 15:14:29 +0200 Subject: [PATCH 10/11] fix wording in comments --- node/src/header_ex.rs | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/node/src/header_ex.rs b/node/src/header_ex.rs index 46ddf0c8..b4dd9172 100644 --- a/node/src/header_ex.rs +++ b/node/src/header_ex.rs @@ -352,7 +352,7 @@ impl Codec for HeaderCodec { } parse_header_request(&data).ok_or_else(|| { - // We have two cases that can reach here: + // There are two cases that can reach here: // // 1. The request is invalid // 2. The request is incomplete because of the size limit or time limit @@ -383,7 +383,7 @@ impl Codec for HeaderCodec { } if msgs.is_empty() { - // We have two cases that can reach here: + // There are two cases that can reach here: // // 1. The response is invalid // 2. The response is incomplete because of the size limit or time limit From a6d87455c6539b61640238911815b296a6850f3a Mon Sep 17 00:00:00 2001 From: Yiannis Marangos Date: Tue, 14 Nov 2023 15:27:17 +0200 Subject: [PATCH 11/11] fmt --- node/src/session.rs | 5 +---- 1 file changed, 1 insertion(+), 4 deletions(-) diff --git a/node/src/session.rs b/node/src/session.rs index 004f46eb..1323daf3 100644 --- a/node/src/session.rs +++ b/node/src/session.rs @@ -77,10 +77,7 @@ impl Session { } } - let mut headers = responses - .into_iter() - .flatten() - .collect::>(); + let mut headers = responses.into_iter().flatten().collect::>(); headers.sort_unstable_by_key(|header| header.height().value());