From 16b4de5ad0bf034e85d3104d341d9cb369a5033f Mon Sep 17 00:00:00 2001 From: Yiannis Marangos Date: Fri, 13 Oct 2023 15:38:04 +0300 Subject: [PATCH 01/10] feat: Improve performance of Exchange We sacrifice memory usage for performance. --- Cargo.lock | 9 +- node/Cargo.toml | 2 + node/src/exchange.rs | 260 ++++++++++++++++++------------------------- 3 files changed, 115 insertions(+), 156 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index e90ee59d..a1327361 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -532,9 +532,9 @@ checksum = "14c189c53d098945499cdfa7ecc63567cf3886b3332b312a5b4585d8d3a6a610" [[package]] name = "bytes" -version = "1.4.0" +version = "1.5.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "89b2fd2a0dcf38d7971e2194b6b6eebab45ae01067456a7fd93d5547a61b70be" +checksum = "a2bd12c1caf447e69cd4528f47f94d203fd2582878ecb9e9465484c4148a8223" dependencies = [ "serde", ] @@ -591,6 +591,7 @@ dependencies = [ "tendermint-proto", "thiserror", "tokio", + "tokio-util", "tracing", "wasm-bindgen-futures", ] @@ -4032,9 +4033,9 @@ dependencies = [ [[package]] name = "tokio-util" -version = "0.7.8" +version = "0.7.9" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "806fe8c2c87eccc8b3267cbae29ed3ab2d0bd37fca70ab622e46aaa9375ddb7d" +checksum = "1d68074620f57a0b21594d9735eb2e98ab38b17f80d3fcb189fca266771ca60d" dependencies = [ "bytes", "futures-core", diff --git a/node/Cargo.toml b/node/Cargo.toml index 4a1bd578..e798870e 100644 --- a/node/Cargo.toml +++ b/node/Cargo.toml @@ -10,6 +10,7 @@ celestia-types = { workspace = true } tendermint-proto = { workspace = true } async-trait = "0.1.73" +bytes = "1.5.0" dashmap = "5.5.3" futures = "0.3.28" instant = "0.1.12" @@ -27,6 +28,7 @@ rand = "0.8.5" smallvec = { version = "1.11.1", features = ["union", "const_generics"] } thiserror = "1.0.48" tokio = { version = "1.32.0", features = ["macros", "sync"] } +tokio-util = { version = "0.7.9", features = ["compat"] } tracing = "0.1.37" [target.'cfg(not(target_arch = "wasm32"))'.dependencies] diff --git a/node/src/exchange.rs b/node/src/exchange.rs index 24690793..33abf715 100644 --- a/node/src/exchange.rs +++ b/node/src/exchange.rs @@ -1,11 +1,13 @@ use std::io; use std::sync::Arc; use std::task::{Context, Poll}; +use tracing::warn; use async_trait::async_trait; +use bytes::{BufMut, BytesMut}; use celestia_proto::p2p::pb::{HeaderRequest, HeaderResponse}; use celestia_types::ExtendedHeader; -use futures::{AsyncRead, AsyncReadExt, AsyncWrite, AsyncWriteExt}; +use futures::{AsyncRead, AsyncWrite, AsyncWriteExt}; use libp2p::{ core::Endpoint, request_response::{self, Codec, InboundFailure, OutboundFailure, ProtocolSupport}, @@ -15,7 +17,10 @@ use libp2p::{ }, Multiaddr, PeerId, StreamProtocol, }; -use prost::{length_delimiter_len, Message}; +use prost::Message; +use tokio::io::AsyncReadExt; +use tokio_util::compat::FuturesAsyncReadCompatExt; +use tracing::debug; use tracing::instrument; mod client; @@ -33,8 +38,6 @@ use crate::utils::{protocol_id, OneshotResultSender}; const REQUEST_SIZE_MAXIMUM: usize = 1024; /// Max response size in bytes const RESPONSE_SIZE_MAXIMUM: usize = 10 * 1024 * 1024; -/// Maximum length of the protobuf length delimiter in bytes -const PROTOBUF_MAX_LENGTH_DELIMITER_LEN: usize = 10; type RequestType = HeaderRequest; type ResponseType = Vec; @@ -278,96 +281,6 @@ where #[derive(Clone, Copy, Debug, Default)] pub(crate) struct HeaderCodec; -#[derive(Debug, thiserror::Error, PartialEq)] -pub enum ReadHeaderError { - #[error("stream closed while trying to get header length")] - StreamClosed, - #[error("varint overflow")] - VarintOverflow, - #[error("request too large: {0}")] - ResponseTooLarge(usize), -} - -impl HeaderCodec { - async fn read_message( - reader: &mut R, - buf: &mut Vec, - max_len: usize, - ) -> io::Result> - where - R: AsyncRead + Unpin + Send, - T: Message + Default, - { - let mut read_len = buf.len(); // buf might have data from previous iterations - - if read_len < 512 { - // resize to increase the chance of reading all the data in one go - buf.resize(512, 0) - } - - let data_len = loop { - if let Ok(len) = prost::decode_length_delimiter(&buf[..read_len]) { - break len; - } - - if read_len >= PROTOBUF_MAX_LENGTH_DELIMITER_LEN { - return Err(io::Error::new( - io::ErrorKind::InvalidData, - ReadHeaderError::VarintOverflow, - )); - } - - match reader.read(&mut buf[read_len..]).await? { - 0 => { - // check if we're between Messages, in which case it's ok to stop - if read_len == 0 { - return Ok(None); - } else { - return Err(io::Error::new( - io::ErrorKind::UnexpectedEof, - ReadHeaderError::StreamClosed, - )); - } - } - n => read_len += n, - }; - }; - - // truncate buffer to the data that was actually read_len - buf.truncate(read_len); - - let length_delimiter_len = length_delimiter_len(data_len); - let single_message_len = length_delimiter_len + data_len; - - if data_len > max_len { - return Err(io::Error::new( - io::ErrorKind::Other, - ReadHeaderError::ResponseTooLarge(data_len), - )); - } - - if read_len < single_message_len { - // we need to read_len more - buf.resize(single_message_len, 0); - reader - .read_exact(&mut buf[read_len..single_message_len]) - .await?; - } - - let val = T::decode(&buf[length_delimiter_len..single_message_len])?; - - // we've read_len past one message when trying to get length delimiter, need to handle - // partially read_len data in the buffer - if single_message_len < read_len { - buf.drain(..single_message_len); - } else { - buf.clear(); - } - - Ok(Some(val)) - } -} - #[async_trait] impl Codec for HeaderCodec { type Protocol = StreamProtocol; @@ -378,13 +291,19 @@ impl Codec for HeaderCodec { where T: AsyncRead + Unpin + Send, { - let mut buf = Vec::new(); + let mut io = io.compat(); + let mut buf = BytesMut::with_capacity(REQUEST_SIZE_MAXIMUM).limit(REQUEST_SIZE_MAXIMUM); - HeaderCodec::read_message(io, &mut buf, REQUEST_SIZE_MAXIMUM) - .await? - .ok_or_else(|| { - io::Error::new(io::ErrorKind::UnexpectedEof, ReadHeaderError::StreamClosed) - }) + while let Ok(len) = io.read_buf(&mut buf).await { + if len == 0 { + break; + } + } + + let data = &buf.into_inner()[..]; + + parse_header_request(data) + .ok_or_else(|| io::Error::new(io::ErrorKind::Other, "invalid request")) } async fn read_response( @@ -395,18 +314,28 @@ impl Codec for HeaderCodec { where T: AsyncRead + Unpin + Send, { - let mut messages = vec![]; - let mut buf = Vec::new(); - loop { - match HeaderCodec::read_message(io, &mut buf, RESPONSE_SIZE_MAXIMUM).await { - Ok(None) => break, - Ok(Some(msg)) => messages.push(msg), - Err(e) => { - return Err(e); - } - }; + let mut io = io.compat(); + let mut buf = BytesMut::with_capacity(RESPONSE_SIZE_MAXIMUM).limit(RESPONSE_SIZE_MAXIMUM); + + while let Ok(len) = io.read_buf(&mut buf).await { + if len == 0 { + break; + } + } + + let mut data = &buf.into_inner()[..]; + let mut msgs = Vec::new(); + + while let Some((header, rest)) = parse_header_response(data) { + msgs.push(header); + data = rest; } - Ok(messages) + + if msgs.is_empty() { + return Err(io::Error::new(io::ErrorKind::Other, "invalid response")); + } + + Ok(msgs) } async fn write_request( @@ -418,9 +347,11 @@ impl Codec for HeaderCodec { where T: AsyncWrite + Unpin + Send, { - let data = req.encode_length_delimited_to_vec(); + let mut buf = BytesMut::with_capacity(REQUEST_SIZE_MAXIMUM); - io.write_all(data.as_ref()).await?; + let _ = req.encode_length_delimited(&mut buf); + + io.write_all(&buf).await?; Ok(()) } @@ -434,19 +365,67 @@ impl Codec for HeaderCodec { where T: AsyncWrite + Unpin + Send, { - for resp in resps { - let data = resp.encode_length_delimited_to_vec(); + let mut buf = BytesMut::with_capacity(RESPONSE_SIZE_MAXIMUM); - io.write_all(&data).await?; + for resp in resps { + if resp.encode_length_delimited(&mut buf).is_err() { + // Error on encoding means the buffer is full. + // We will send a partial response back. + break; + } } + io.write_all(&buf).await?; + Ok(()) } } +fn parse_delimiter(mut buf: &[u8]) -> Option<(usize, &[u8])> { + if buf.is_empty() { + return None; + } + + let Ok(len) = prost::decode_length_delimiter(&mut buf) else { + return None; + }; + + Some((len, buf)) +} + +fn parse_header_response(buf: &[u8]) -> Option<(HeaderResponse, &[u8])> { + let (len, rest) = parse_delimiter(buf)?; + + if rest.len() < len { + debug!("Message is too long: {len}"); + return None; + } + + let Ok(msg) = HeaderResponse::decode(&rest[..len]) else { + return None; + }; + + Some((msg, &rest[len..])) +} + +fn parse_header_request(buf: &[u8]) -> Option { + let (len, rest) = parse_delimiter(buf)?; + + if rest.len() < len { + debug!("Message is too long: {len}"); + return None; + } + + let Ok(msg) = HeaderRequest::decode(&rest[..len]) else { + return None; + }; + + Some(msg) +} + #[cfg(test)] mod tests { - use super::{HeaderCodec, ReadHeaderError, REQUEST_SIZE_MAXIMUM, RESPONSE_SIZE_MAXIMUM}; + use super::*; use bytes::BytesMut; use celestia_proto::p2p::pb::{header_request::Data, HeaderRequest, HeaderResponse}; use futures::io::{AsyncRead, AsyncReadExt, Cursor, Error}; @@ -512,8 +491,7 @@ mod tests { async fn test_decode_header_request_too_large() { let too_long_message_len = REQUEST_SIZE_MAXIMUM + 1; let mut length_delimiter_buffer = BytesMut::new(); - prost::encode_length_delimiter(REQUEST_SIZE_MAXIMUM + 1, &mut length_delimiter_buffer) - .unwrap(); + prost::encode_length_delimiter(too_long_message_len, &mut length_delimiter_buffer).unwrap(); let mut reader = Cursor::new(length_delimiter_buffer); let stream_protocol = StreamProtocol::new("/foo/bar/v0.1"); @@ -525,15 +503,6 @@ mod tests { .expect_err("expected error for too large request"); assert_eq!(decoding_error.kind(), ErrorKind::Other); - let inner_err = decoding_error - .get_ref() - .unwrap() - .downcast_ref::() - .unwrap(); - assert_eq!( - inner_err, - &ReadHeaderError::ResponseTooLarge(too_long_message_len) - ); } #[tokio::test] @@ -552,19 +521,10 @@ mod tests { .expect_err("expected error for too large request"); assert_eq!(decoding_error.kind(), ErrorKind::Other); - let inner_err = decoding_error - .get_ref() - .unwrap() - .downcast_ref::() - .unwrap(); - assert_eq!( - inner_err, - &ReadHeaderError::ResponseTooLarge(too_long_message_len) - ); } - #[tokio::test] - async fn test_invalid_varint() { + #[test] + fn test_invalid_varint() { // 10 consecutive bytes with continuation bit set + 1 byte, which is longer than allowed // for length delimiter let varint = [ @@ -580,21 +540,17 @@ mod tests { 0b1000_0000, 0b0000_0001, ]; - let mut reader = Cursor::new(varint); - let mut buf = vec![]; - let decoding_error = - HeaderCodec::read_message::<_, HeaderRequest>(&mut reader, &mut buf, 512) - .await - .expect_err("expected varint overflow"); + assert_eq!(parse_delimiter(&varint), None); + } - assert_eq!(decoding_error.kind(), ErrorKind::InvalidData); - let inner_err = decoding_error - .get_ref() - .unwrap() - .downcast_ref::() - .unwrap(); - assert_eq!(inner_err, &ReadHeaderError::VarintOverflow); + #[test] + fn parse_trailing_zero_varint() { + let varint = [0b1000_0001, 0b0000_0000, 0b1111_1111]; + assert!(matches!(parse_delimiter(&varint), Some((1, [255])))); + + let varint = [0b1000_0000, 0b1000_0000, 0b1000_0000, 0b0000_0000]; + assert!(matches!(parse_delimiter(&varint), Some((0, [])))); } #[tokio::test] From 17bcd422f493632fd4d027c401bb70e6299342d4 Mon Sep 17 00:00:00 2001 From: Yiannis Marangos Date: Fri, 13 Oct 2023 16:29:04 +0300 Subject: [PATCH 02/10] implement read_up_to --- Cargo.lock | 1 - node/Cargo.toml | 2 -- node/src/exchange.rs | 65 ++++++++++++++++++++++++++------------------ 3 files changed, 38 insertions(+), 30 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index a1327361..761e3511 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -591,7 +591,6 @@ dependencies = [ "tendermint-proto", "thiserror", "tokio", - "tokio-util", "tracing", "wasm-bindgen-futures", ] diff --git a/node/Cargo.toml b/node/Cargo.toml index e798870e..4a1bd578 100644 --- a/node/Cargo.toml +++ b/node/Cargo.toml @@ -10,7 +10,6 @@ celestia-types = { workspace = true } tendermint-proto = { workspace = true } async-trait = "0.1.73" -bytes = "1.5.0" dashmap = "5.5.3" futures = "0.3.28" instant = "0.1.12" @@ -28,7 +27,6 @@ rand = "0.8.5" smallvec = { version = "1.11.1", features = ["union", "const_generics"] } thiserror = "1.0.48" tokio = { version = "1.32.0", features = ["macros", "sync"] } -tokio-util = { version = "0.7.9", features = ["compat"] } tracing = "0.1.37" [target.'cfg(not(target_arch = "wasm32"))'.dependencies] diff --git a/node/src/exchange.rs b/node/src/exchange.rs index 33abf715..63a4442f 100644 --- a/node/src/exchange.rs +++ b/node/src/exchange.rs @@ -4,10 +4,9 @@ use std::task::{Context, Poll}; use tracing::warn; use async_trait::async_trait; -use bytes::{BufMut, BytesMut}; use celestia_proto::p2p::pb::{HeaderRequest, HeaderResponse}; use celestia_types::ExtendedHeader; -use futures::{AsyncRead, AsyncWrite, AsyncWriteExt}; +use futures::{AsyncRead, AsyncReadExt, AsyncWrite, AsyncWriteExt}; use libp2p::{ core::Endpoint, request_response::{self, Codec, InboundFailure, OutboundFailure, ProtocolSupport}, @@ -18,8 +17,7 @@ use libp2p::{ Multiaddr, PeerId, StreamProtocol, }; use prost::Message; -use tokio::io::AsyncReadExt; -use tokio_util::compat::FuturesAsyncReadCompatExt; +use std::mem; use tracing::debug; use tracing::instrument; @@ -291,18 +289,9 @@ impl Codec for HeaderCodec { where T: AsyncRead + Unpin + Send, { - let mut io = io.compat(); - let mut buf = BytesMut::with_capacity(REQUEST_SIZE_MAXIMUM).limit(REQUEST_SIZE_MAXIMUM); + let data = read_up_to(io, REQUEST_SIZE_MAXIMUM).await?; - while let Ok(len) = io.read_buf(&mut buf).await { - if len == 0 { - break; - } - } - - let data = &buf.into_inner()[..]; - - parse_header_request(data) + parse_header_request(&data) .ok_or_else(|| io::Error::new(io::ErrorKind::Other, "invalid request")) } @@ -314,19 +303,12 @@ impl Codec for HeaderCodec { where T: AsyncRead + Unpin + Send, { - let mut io = io.compat(); - let mut buf = BytesMut::with_capacity(RESPONSE_SIZE_MAXIMUM).limit(RESPONSE_SIZE_MAXIMUM); - - while let Ok(len) = io.read_buf(&mut buf).await { - if len == 0 { - break; - } - } + let data = read_up_to(io, RESPONSE_SIZE_MAXIMUM).await?; - let mut data = &buf.into_inner()[..]; + let mut data = &data[..]; let mut msgs = Vec::new(); - while let Some((header, rest)) = parse_header_response(data) { + while let Some((header, rest)) = parse_header_response(&data) { msgs.push(header); data = rest; } @@ -347,7 +329,7 @@ impl Codec for HeaderCodec { where T: AsyncWrite + Unpin + Send, { - let mut buf = BytesMut::with_capacity(REQUEST_SIZE_MAXIMUM); + let mut buf = Vec::with_capacity(REQUEST_SIZE_MAXIMUM); let _ = req.encode_length_delimited(&mut buf); @@ -365,7 +347,7 @@ impl Codec for HeaderCodec { where T: AsyncWrite + Unpin + Send, { - let mut buf = BytesMut::with_capacity(RESPONSE_SIZE_MAXIMUM); + let mut buf = Vec::with_capacity(RESPONSE_SIZE_MAXIMUM); for resp in resps { if resp.encode_length_delimited(&mut buf).is_err() { @@ -381,6 +363,35 @@ impl Codec for HeaderCodec { } } +async fn read_up_to(io: &mut T, limit: usize) -> io::Result> +where + T: AsyncRead + Unpin + Send, +{ + let mut buf = Vec::with_capacity(limit); + + loop { + let read_buf: &mut [u8] = unsafe { mem::transmute(buf.spare_capacity_mut()) }; + + if read_buf.is_empty() { + // No empty space. Buffer is full. + break; + } + + let len = io.read(read_buf).await?; + + if len == 0 { + // EOF + break; + } + + unsafe { + buf.set_len(buf.len() + len); + } + } + + Ok(buf) +} + fn parse_delimiter(mut buf: &[u8]) -> Option<(usize, &[u8])> { if buf.is_empty() { return None; From c442bc6109c5707b4ee265a6c3943feac20b8dcc Mon Sep 17 00:00:00 2001 From: Yiannis Marangos Date: Fri, 13 Oct 2023 16:34:35 +0300 Subject: [PATCH 03/10] fix clippy --- node/src/exchange.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/node/src/exchange.rs b/node/src/exchange.rs index 63a4442f..e87512ac 100644 --- a/node/src/exchange.rs +++ b/node/src/exchange.rs @@ -308,7 +308,7 @@ impl Codec for HeaderCodec { let mut data = &data[..]; let mut msgs = Vec::new(); - while let Some((header, rest)) = parse_header_response(&data) { + while let Some((header, rest)) = parse_header_response(data) { msgs.push(header); data = rest; } From 5232c2e4a8070ec3ddf869593bf180b5f044120c Mon Sep 17 00:00:00 2001 From: Yiannis Marangos Date: Fri, 13 Oct 2023 16:40:32 +0300 Subject: [PATCH 04/10] better logs --- node/src/exchange.rs | 13 +++++++++++-- 1 file changed, 11 insertions(+), 2 deletions(-) diff --git a/node/src/exchange.rs b/node/src/exchange.rs index e87512ac..e54c0e30 100644 --- a/node/src/exchange.rs +++ b/node/src/exchange.rs @@ -291,6 +291,10 @@ impl Codec for HeaderCodec { { let data = read_up_to(io, REQUEST_SIZE_MAXIMUM).await?; + if data.len() >= REQUEST_SIZE_MAXIMUM { + debug!("Message filled the whole buffer (len: {})", data.len()); + } + parse_header_request(&data) .ok_or_else(|| io::Error::new(io::ErrorKind::Other, "invalid request")) } @@ -305,6 +309,10 @@ impl Codec for HeaderCodec { { let data = read_up_to(io, RESPONSE_SIZE_MAXIMUM).await?; + if data.len() >= RESPONSE_SIZE_MAXIMUM { + debug!("Message filled the whole buffer (len: {})", data.len()); + } + let mut data = &data[..]; let mut msgs = Vec::new(); @@ -353,6 +361,7 @@ impl Codec for HeaderCodec { if resp.encode_length_delimited(&mut buf).is_err() { // Error on encoding means the buffer is full. // We will send a partial response back. + debug!("Sending partial response"); break; } } @@ -408,7 +417,7 @@ fn parse_header_response(buf: &[u8]) -> Option<(HeaderResponse, &[u8])> { let (len, rest) = parse_delimiter(buf)?; if rest.len() < len { - debug!("Message is too long: {len}"); + debug!("Message is incomplete: {len}"); return None; } @@ -423,7 +432,7 @@ fn parse_header_request(buf: &[u8]) -> Option { let (len, rest) = parse_delimiter(buf)?; if rest.len() < len { - debug!("Message is too long: {len}"); + debug!("Message is incomplete: {len}"); return None; } From dcd29902b1e7964597ec512952316ad4bbf6e656 Mon Sep 17 00:00:00 2001 From: Yiannis Marangos Date: Fri, 13 Oct 2023 16:43:43 +0300 Subject: [PATCH 05/10] make transmute more explicit --- node/src/exchange.rs | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/node/src/exchange.rs b/node/src/exchange.rs index e54c0e30..f3d65d4f 100644 --- a/node/src/exchange.rs +++ b/node/src/exchange.rs @@ -17,7 +17,7 @@ use libp2p::{ Multiaddr, PeerId, StreamProtocol, }; use prost::Message; -use std::mem; +use std::mem::{self, MaybeUninit}; use tracing::debug; use tracing::instrument; @@ -379,7 +379,8 @@ where let mut buf = Vec::with_capacity(limit); loop { - let read_buf: &mut [u8] = unsafe { mem::transmute(buf.spare_capacity_mut()) }; + let read_buf_unint: &mut [MaybeUninit] = buf.spare_capacity_mut(); + let read_buf: &mut [u8] = unsafe { mem::transmute(read_buf_unint) }; if read_buf.is_empty() { // No empty space. Buffer is full. From aa8f8f71680daa6b222dbdec4d2bd023d3205da6 Mon Sep 17 00:00:00 2001 From: Yiannis Marangos Date: Fri, 13 Oct 2023 18:35:58 +0300 Subject: [PATCH 06/10] Update node/src/exchange.rs MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Co-authored-by: Maciej ZwoliƄski Signed-off-by: Yiannis Marangos --- node/src/exchange.rs | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/node/src/exchange.rs b/node/src/exchange.rs index f3d65d4f..79b9ef6c 100644 --- a/node/src/exchange.rs +++ b/node/src/exchange.rs @@ -379,8 +379,8 @@ where let mut buf = Vec::with_capacity(limit); loop { - let read_buf_unint: &mut [MaybeUninit] = buf.spare_capacity_mut(); - let read_buf: &mut [u8] = unsafe { mem::transmute(read_buf_unint) }; + let read_buf_uninit: &mut [MaybeUninit] = buf.spare_capacity_mut(); + let read_buf: &mut [u8] = unsafe { mem::transmute(read_buf_uninit) }; if read_buf.is_empty() { // No empty space. Buffer is full. From b28afb403f58bbdc894279ab042d9c55ea3d43ce Mon Sep 17 00:00:00 2001 From: Yiannis Marangos Date: Sat, 14 Oct 2023 01:05:20 +0300 Subject: [PATCH 07/10] Use pointer casting instead of transmute --- node/src/exchange.rs | 20 +++++++++++++------- 1 file changed, 13 insertions(+), 7 deletions(-) diff --git a/node/src/exchange.rs b/node/src/exchange.rs index 79b9ef6c..7757b000 100644 --- a/node/src/exchange.rs +++ b/node/src/exchange.rs @@ -1,3 +1,4 @@ +use std::future::poll_fn; use std::io; use std::sync::Arc; use std::task::{Context, Poll}; @@ -6,7 +7,7 @@ use tracing::warn; use async_trait::async_trait; use celestia_proto::p2p::pb::{HeaderRequest, HeaderResponse}; use celestia_types::ExtendedHeader; -use futures::{AsyncRead, AsyncReadExt, AsyncWrite, AsyncWriteExt}; +use futures::{AsyncRead, AsyncWrite, AsyncWriteExt}; use libp2p::{ core::Endpoint, request_response::{self, Codec, InboundFailure, OutboundFailure, ProtocolSupport}, @@ -17,7 +18,8 @@ use libp2p::{ Multiaddr, PeerId, StreamProtocol, }; use prost::Message; -use std::mem::{self, MaybeUninit}; +use std::mem::MaybeUninit; +use std::pin::Pin; use tracing::debug; use tracing::instrument; @@ -379,15 +381,19 @@ where let mut buf = Vec::with_capacity(limit); loop { - let read_buf_uninit: &mut [MaybeUninit] = buf.spare_capacity_mut(); - let read_buf: &mut [u8] = unsafe { mem::transmute(read_buf_uninit) }; - - if read_buf.is_empty() { + if buf.len() == buf.capacity() { // No empty space. Buffer is full. break; } - let len = io.read(read_buf).await?; + // We can not keep references from pointer pointer casting across awaits + // so we u poll_fn and poll_read instead. + let len = poll_fn(|cx| { + let read_buf = + unsafe { &mut *(buf.spare_capacity_mut() as *mut [MaybeUninit] as *mut [u8]) }; + Pin::new(&mut *io).poll_read(cx, read_buf) + }) + .await?; if len == 0 { // EOF From df53bdfc3c7ff137f9273d6fdfb90447ecc004c6 Mon Sep 17 00:00:00 2001 From: Yiannis Marangos Date: Sat, 14 Oct 2023 01:21:19 +0300 Subject: [PATCH 08/10] remove unsafe --- node/src/exchange.rs | 27 ++++++++++----------------- 1 file changed, 10 insertions(+), 17 deletions(-) diff --git a/node/src/exchange.rs b/node/src/exchange.rs index 7757b000..28130b7b 100644 --- a/node/src/exchange.rs +++ b/node/src/exchange.rs @@ -1,4 +1,3 @@ -use std::future::poll_fn; use std::io; use std::sync::Arc; use std::task::{Context, Poll}; @@ -7,7 +6,7 @@ use tracing::warn; use async_trait::async_trait; use celestia_proto::p2p::pb::{HeaderRequest, HeaderResponse}; use celestia_types::ExtendedHeader; -use futures::{AsyncRead, AsyncWrite, AsyncWriteExt}; +use futures::{AsyncRead, AsyncReadExt, AsyncWrite, AsyncWriteExt}; use libp2p::{ core::Endpoint, request_response::{self, Codec, InboundFailure, OutboundFailure, ProtocolSupport}, @@ -18,8 +17,6 @@ use libp2p::{ Multiaddr, PeerId, StreamProtocol, }; use prost::Message; -use std::mem::MaybeUninit; -use std::pin::Pin; use tracing::debug; use tracing::instrument; @@ -378,33 +375,29 @@ async fn read_up_to(io: &mut T, limit: usize) -> io::Result> where T: AsyncRead + Unpin + Send, { - let mut buf = Vec::with_capacity(limit); + let mut buf = Vec::new(); + let mut read_len = 0; + + buf.resize(limit, 0); loop { - if buf.len() == buf.capacity() { + if read_len == buf.len() { // No empty space. Buffer is full. break; } - // We can not keep references from pointer pointer casting across awaits - // so we u poll_fn and poll_read instead. - let len = poll_fn(|cx| { - let read_buf = - unsafe { &mut *(buf.spare_capacity_mut() as *mut [MaybeUninit] as *mut [u8]) }; - Pin::new(&mut *io).poll_read(cx, read_buf) - }) - .await?; + let len = io.read(&mut buf[read_len..]).await?; if len == 0 { // EOF break; } - unsafe { - buf.set_len(buf.len() + len); - } + read_len += len; } + buf.truncate(read_len); + Ok(buf) } From e3781d7be486a66abde6ccceed3d658174f293af Mon Sep 17 00:00:00 2001 From: Yiannis Marangos Date: Sat, 14 Oct 2023 01:35:03 +0300 Subject: [PATCH 09/10] increase sleep in peer_discovery test --- node/tests/node.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/node/tests/node.rs b/node/tests/node.rs index 9c87f67b..7d1438d4 100644 --- a/node/tests/node.rs +++ b/node/tests/node.rs @@ -163,7 +163,7 @@ async fn peer_discovery() { node3.p2p().wait_connected().await.unwrap(); // Small wait until all nodes are discovered and connected - sleep(Duration::from_millis(500)).await; + sleep(Duration::from_millis(800)).await; // Check Node1 connected peers let connected_peers = node1.p2p().connected_peers().await.unwrap(); From 837c243825f3be9024fab8d13e3dbc6aebeb6f67 Mon Sep 17 00:00:00 2001 From: Yiannis Marangos Date: Sun, 15 Oct 2023 13:14:14 +0300 Subject: [PATCH 10/10] fix: use fast zeroed allocation for vec Ref: https://rust-lang.github.io/rust-clippy/master/index.html#/slow_vector_initialization --- node/src/exchange.rs | 4 +--- 1 file changed, 1 insertion(+), 3 deletions(-) diff --git a/node/src/exchange.rs b/node/src/exchange.rs index 28130b7b..03cef7b6 100644 --- a/node/src/exchange.rs +++ b/node/src/exchange.rs @@ -375,11 +375,9 @@ async fn read_up_to(io: &mut T, limit: usize) -> io::Result> where T: AsyncRead + Unpin + Send, { - let mut buf = Vec::new(); + let mut buf = vec![0u8; limit]; let mut read_len = 0; - buf.resize(limit, 0); - loop { if read_len == buf.len() { // No empty space. Buffer is full.