diff --git a/neqo-http3/src/connection_client.rs b/neqo-http3/src/connection_client.rs index c595a08c9d..221c1be1c3 100644 --- a/neqo-http3/src/connection_client.rs +++ b/neqo-http3/src/connection_client.rs @@ -695,6 +695,7 @@ mod tests { use neqo_qpack::encoder::QPackEncoder; use neqo_transport::{ CloseError, ConnectionEvent, FixedConnectionIdManager, QuicVersion, State, + RECV_BUFFER_SIZE, SEND_BUFFER_SIZE, }; use test_fixture::{ default_server, fixture_init, loopback, now, DEFAULT_ALPN, DEFAULT_SERVER_NAME, @@ -1886,7 +1887,7 @@ mod tests { if let ConnectionEvent::RecvStreamReadable { stream_id } = e { if stream_id == request_stream_id { // Read the DATA frame. - let mut buf = vec![1_u8; Connection::stream_recv_buffer_size()]; + let mut buf = vec![1_u8; RECV_BUFFER_SIZE]; let (amount, fin) = server.conn.stream_recv(stream_id, &mut buf).unwrap(); assert_eq!(fin, true); assert_eq!( @@ -1958,10 +1959,7 @@ mod tests { assert_eq!(sent, Ok(first_frame.len())); // The second frame cannot fit. - let sent = client.send_request_body( - request_stream_id, - &vec![0_u8; Connection::stream_recv_buffer_size()], - ); + let sent = client.send_request_body(request_stream_id, &vec![0_u8; SEND_BUFFER_SIZE]); assert_eq!(sent, Ok(expected_second_data_frame.len())); // Close stream. @@ -1970,7 +1968,7 @@ mod tests { let mut out = client.process(None, now()); // We need to loop a bit until all data has been sent. Once for every 1K // of data. - for _i in 0..Connection::stream_send_buffer_size() / 1000 { + for _i in 0..SEND_BUFFER_SIZE / 1000 { out = server.conn.process(out.dgram(), now()); out = client.process(out.dgram(), now()); } @@ -1980,7 +1978,7 @@ mod tests { if let ConnectionEvent::RecvStreamReadable { stream_id } = e { if stream_id == request_stream_id { // Read DATA frames. - let mut buf = vec![1_u8; Connection::stream_recv_buffer_size()]; + let mut buf = vec![1_u8; RECV_BUFFER_SIZE]; let (amount, fin) = server.conn.stream_recv(stream_id, &mut buf).unwrap(); assert_eq!(fin, true); assert_eq!( @@ -2033,7 +2031,7 @@ mod tests { // After the first frame there is exactly 63+2 bytes left in the send buffer. #[test] fn fetch_two_data_frame_second_63bytes() { - let (buf, hdr) = alloc_buffer(Connection::stream_send_buffer_size() - 88); + let (buf, hdr) = alloc_buffer(SEND_BUFFER_SIZE - 88); fetch_with_two_data_frames(&buf, &hdr, &[0x0, 0x3f], &[0_u8; 63]); } @@ -2042,7 +2040,7 @@ mod tests { // but we can only send 63 bytes. #[test] fn fetch_two_data_frame_second_63bytes_place_for_66() { - let (buf, hdr) = alloc_buffer(Connection::stream_send_buffer_size() - 89); + let (buf, hdr) = alloc_buffer(SEND_BUFFER_SIZE - 89); fetch_with_two_data_frames(&buf, &hdr, &[0x0, 0x3f], &[0_u8; 63]); } @@ -2051,7 +2049,7 @@ mod tests { // but we can only send 64 bytes. #[test] fn fetch_two_data_frame_second_64bytes_place_for_67() { - let (buf, hdr) = alloc_buffer(Connection::stream_send_buffer_size() - 90); + let (buf, hdr) = alloc_buffer(SEND_BUFFER_SIZE - 90); fetch_with_two_data_frames(&buf, &hdr, &[0x0, 0x40, 0x40], &[0_u8; 64]); } @@ -2059,7 +2057,7 @@ mod tests { // After the first frame there is exactly 16383+3 bytes left in the send buffer. #[test] fn fetch_two_data_frame_second_16383bytes() { - let (buf, hdr) = alloc_buffer(Connection::stream_send_buffer_size() - 16409); + let (buf, hdr) = alloc_buffer(SEND_BUFFER_SIZE - 16409); fetch_with_two_data_frames(&buf, &hdr, &[0x0, 0x7f, 0xff], &[0_u8; 16383]); } @@ -2067,7 +2065,7 @@ mod tests { // After the first frame there is exactly 16383+4 bytes left in the send buffer, but we can only send 16383 bytes. #[test] fn fetch_two_data_frame_second_16383bytes_place_for_16387() { - let (buf, hdr) = alloc_buffer(Connection::stream_send_buffer_size() - 16410); + let (buf, hdr) = alloc_buffer(SEND_BUFFER_SIZE - 16410); fetch_with_two_data_frames(&buf, &hdr, &[0x0, 0x7f, 0xff], &[0_u8; 16383]); } @@ -2075,7 +2073,7 @@ mod tests { // After the first frame there is exactly 16383+5 bytes left in the send buffer, but we can only send 16383 bytes. #[test] fn fetch_two_data_frame_second_16383bytes_place_for_16388() { - let (buf, hdr) = alloc_buffer(Connection::stream_send_buffer_size() - 16411); + let (buf, hdr) = alloc_buffer(SEND_BUFFER_SIZE - 16411); fetch_with_two_data_frames(&buf, &hdr, &[0x0, 0x7f, 0xff], &[0_u8; 16383]); } @@ -2083,7 +2081,7 @@ mod tests { // After the first frame there is exactly 16384+5 bytes left in the send buffer, but we can send 16384 bytes. #[test] fn fetch_two_data_frame_second_16384bytes_place_for_16389() { - let (buf, hdr) = alloc_buffer(Connection::stream_send_buffer_size() - 16412); + let (buf, hdr) = alloc_buffer(SEND_BUFFER_SIZE - 16412); fetch_with_two_data_frames(&buf, &hdr, &[0x0, 0x80, 0x0, 0x40, 0x0], &[0_u8; 16384]); } diff --git a/neqo-transport/src/connection.rs b/neqo-transport/src/connection.rs index f13cd35dec..ab8581e4c0 100644 --- a/neqo-transport/src/connection.rs +++ b/neqo-transport/src/connection.rs @@ -43,8 +43,8 @@ use crate::packet::{ use crate::path::Path; use crate::qlog; use crate::recovery::{LossRecovery, RecoveryToken, SendProfile, GRANULARITY}; -use crate::recv_stream::{RecvStream, RecvStreams, RX_STREAM_DATA_WINDOW}; -use crate::send_stream::{SendStream, SendStreams, TxBuffer}; +use crate::recv_stream::{RecvStream, RecvStreams, RECV_BUFFER_SIZE}; +use crate::send_stream::{SendStream, SendStreams}; use crate::stats::Stats; use crate::stream_id::{StreamId, StreamIndex, StreamIndexes}; use crate::tparams::{ @@ -526,13 +526,16 @@ impl Connection { fn set_tp_defaults(tps: &mut TransportParameters) { tps.set_integer( tparams::INITIAL_MAX_STREAM_DATA_BIDI_LOCAL, - RX_STREAM_DATA_WINDOW, + u64::try_from(RECV_BUFFER_SIZE).unwrap(), ); tps.set_integer( tparams::INITIAL_MAX_STREAM_DATA_BIDI_REMOTE, - RX_STREAM_DATA_WINDOW, + u64::try_from(RECV_BUFFER_SIZE).unwrap(), + ); + tps.set_integer( + tparams::INITIAL_MAX_STREAM_DATA_UNI, + u64::try_from(RECV_BUFFER_SIZE).unwrap(), ); - tps.set_integer(tparams::INITIAL_MAX_STREAM_DATA_UNI, RX_STREAM_DATA_WINDOW); tps.set_integer(tparams::INITIAL_MAX_STREAMS_BIDI, LOCAL_STREAM_LIMIT_BIDI); tps.set_integer(tparams::INITIAL_MAX_STREAMS_UNI, LOCAL_STREAM_LIMIT_UNI); tps.set_integer(tparams::INITIAL_MAX_DATA, LOCAL_MAX_DATA); @@ -2588,16 +2591,6 @@ impl Connection { Ok(()) } - /// The per-stream send buffer size. - pub const fn stream_send_buffer_size() -> usize { - TxBuffer::BUFFER_SIZE - } - - /// The per-stream receive max data size. - pub const fn stream_recv_buffer_size() -> usize { - RX_STREAM_DATA_WINDOW as usize - } - /// Get all current events. Best used just in debug/testing code, use /// next_event() instead. pub fn events(&mut self) -> impl Iterator { @@ -2633,7 +2626,7 @@ mod tests { use crate::path::PATH_MTU_V6; use crate::recovery::ACK_ONLY_SIZE_LIMIT; use crate::recovery::PTO_PACKET_COUNT; - use crate::send_stream::TxBuffer; + use crate::send_stream::SEND_BUFFER_SIZE; use crate::tracking::{ACK_DELAY, MAX_UNACKED_PKTS}; use std::convert::TryInto; @@ -3591,7 +3584,7 @@ mod tests { ); assert_eq!( client - .stream_send(stream_id, &[b'a'; RX_STREAM_DATA_WINDOW as usize]) + .stream_send(stream_id, &[b'a'; RECV_BUFFER_SIZE]) .unwrap(), SMALL_MAX_DATA ); @@ -3619,7 +3612,7 @@ mod tests { client.handle_max_data(100_000_000); assert_eq!( client.stream_avail_send_space(stream_id).unwrap(), - TxBuffer::BUFFER_SIZE - SMALL_MAX_DATA + SEND_BUFFER_SIZE - SMALL_MAX_DATA ); // Increase max stream data. Avail space now limited by tx buffer @@ -3630,7 +3623,7 @@ mod tests { .set_max_stream_data(100_000_000); assert_eq!( client.stream_avail_send_space(stream_id).unwrap(), - TxBuffer::BUFFER_SIZE - SMALL_MAX_DATA + 4096 + SEND_BUFFER_SIZE - SMALL_MAX_DATA + 4096 ); let evts = client.events().collect::>(); @@ -5786,7 +5779,7 @@ mod tests { server .flow_mgr .borrow_mut() - .stream_data_blocked(3.into(), RX_STREAM_DATA_WINDOW * 4); + .stream_data_blocked(3.into(), RECV_BUFFER_SIZE as u64 * 4); let out = server.process(None, now); assert!(out.as_dgram_ref().is_some()); @@ -5804,7 +5797,7 @@ mod tests { // window value. assert!(frames.iter().any( |(f, _)| matches!(f, Frame::MaxStreamData { maximum_stream_data, .. } - if *maximum_stream_data == RX_STREAM_DATA_WINDOW) + if *maximum_stream_data == RECV_BUFFER_SIZE as u64) )); } } diff --git a/neqo-transport/src/lib.rs b/neqo-transport/src/lib.rs index 21e8c345c4..8b373cab7d 100644 --- a/neqo-transport/src/lib.rs +++ b/neqo-transport/src/lib.rs @@ -39,6 +39,8 @@ pub use self::packet::QuicVersion; pub use self::stream_id::StreamId; const LOCAL_IDLE_TIMEOUT: std::time::Duration = std::time::Duration::from_secs(30); // 30 second +pub use self::recv_stream::RECV_BUFFER_SIZE; +pub use self::send_stream::SEND_BUFFER_SIZE; type TransportError = u64; const ERROR_APPLICATION_CLOSE: TransportError = 12; diff --git a/neqo-transport/src/recv_stream.rs b/neqo-transport/src/recv_stream.rs index a130f84791..0a208aa5b3 100644 --- a/neqo-transport/src/recv_stream.rs +++ b/neqo-transport/src/recv_stream.rs @@ -23,7 +23,10 @@ use crate::stream_id::StreamId; use crate::{AppError, Error, Res}; use neqo_common::qtrace; -pub const RX_STREAM_DATA_WINDOW: u64 = 0x10_0000; // 1MiB +const RX_STREAM_DATA_WINDOW: u64 = 0x10_0000; // 1MiB + +// Export as usize for consistency with SEND_BUFFER_SIZE +pub const RECV_BUFFER_SIZE: usize = RX_STREAM_DATA_WINDOW as usize; pub(crate) type RecvStreams = BTreeMap; diff --git a/neqo-transport/src/send_stream.rs b/neqo-transport/src/send_stream.rs index 99a3d3f807..0a6ddceed3 100644 --- a/neqo-transport/src/send_stream.rs +++ b/neqo-transport/src/send_stream.rs @@ -25,6 +25,8 @@ use crate::stream_id::StreamId; use crate::tracking::PNSpace; use crate::{AppError, Error, Res}; +pub const SEND_BUFFER_SIZE: usize = 0x10_0000; // 1 MiB + #[derive(Debug, PartialEq, Clone, Copy)] enum RangeState { Sent, @@ -280,21 +282,19 @@ pub struct TxBuffer { } impl TxBuffer { - pub const BUFFER_SIZE: usize = 0x10_0000; // 1 MiB - pub fn new() -> Self { Self { - send_buf: VecDeque::with_capacity(Self::BUFFER_SIZE), + send_buf: VecDeque::with_capacity(SEND_BUFFER_SIZE), ..Self::default() } } /// Attempt to add some or all of the passed-in buffer to the TxBuffer. pub fn send(&mut self, buf: &[u8]) -> usize { - let can_buffer = min(Self::BUFFER_SIZE - self.buffered(), buf.len()); + let can_buffer = min(SEND_BUFFER_SIZE - self.buffered(), buf.len()); if can_buffer > 0 { self.send_buf.extend(&buf[..can_buffer]); - assert!(self.send_buf.len() <= Self::BUFFER_SIZE); + assert!(self.send_buf.len() <= SEND_BUFFER_SIZE); } can_buffer } @@ -364,7 +364,7 @@ impl TxBuffer { } fn avail(&self) -> usize { - Self::BUFFER_SIZE - self.buffered() + SEND_BUFFER_SIZE - self.buffered() } pub fn highest_sent(&self) -> u64 { @@ -409,7 +409,7 @@ impl SendStreamState { fn tx_avail(&self) -> u64 { match self { // In Ready, TxBuffer not yet allocated but size is known - Self::Ready => TxBuffer::BUFFER_SIZE.try_into().unwrap(), + Self::Ready => SEND_BUFFER_SIZE.try_into().unwrap(), Self::Send { send_buf } | Self::DataSent { send_buf, .. } => { send_buf.avail().try_into().unwrap() } @@ -924,19 +924,16 @@ mod tests { fn tx_buffer_next_bytes_1() { let mut txb = TxBuffer::new(); - assert_eq!(txb.avail(), TxBuffer::BUFFER_SIZE); + assert_eq!(txb.avail(), SEND_BUFFER_SIZE); // Fill the buffer - assert_eq!( - txb.send(&[1; TxBuffer::BUFFER_SIZE * 2]), - TxBuffer::BUFFER_SIZE - ); + assert_eq!(txb.send(&[1; SEND_BUFFER_SIZE * 2]), SEND_BUFFER_SIZE); assert!(matches!(txb.next_bytes(), - Some((0, x)) if x.len()==TxBuffer::BUFFER_SIZE + Some((0, x)) if x.len()==SEND_BUFFER_SIZE && x.iter().all(|ch| *ch == 1))); // Mark almost all as sent. Get what's left - let one_byte_from_end = TxBuffer::BUFFER_SIZE as u64 - 1; + let one_byte_from_end = SEND_BUFFER_SIZE as u64 - 1; txb.mark_as_sent(0, one_byte_from_end as usize); assert!(matches!(txb.next_bytes(), Some((start, x)) if x.len() == 1 @@ -944,7 +941,7 @@ mod tests { && x.iter().all(|ch| *ch == 1))); // Mark all as sent. Get nothing - txb.mark_as_sent(0, TxBuffer::BUFFER_SIZE); + txb.mark_as_sent(0, SEND_BUFFER_SIZE); assert!(matches!(txb.next_bytes(), None)); // Mark as lost. Get it again @@ -956,7 +953,7 @@ mod tests { // Mark a larger range lost, including beyond what's in the buffer even. // Get a little more - let five_bytes_from_end = TxBuffer::BUFFER_SIZE as u64 - 5; + let five_bytes_from_end = SEND_BUFFER_SIZE as u64 - 5; txb.mark_as_lost(five_bytes_from_end, 100); assert!(matches!(txb.next_bytes(), Some((start, x)) if x.len() == 5 @@ -981,7 +978,7 @@ mod tests { txb.mark_as_sent(five_bytes_from_end, 5); assert!(matches!(txb.next_bytes(), Some((start, x)) if x.len() == 30 - && start == TxBuffer::BUFFER_SIZE as u64 + && start == SEND_BUFFER_SIZE as u64 && x.iter().all(|ch| *ch == 2))); } @@ -989,19 +986,16 @@ mod tests { fn tx_buffer_next_bytes_2() { let mut txb = TxBuffer::new(); - assert_eq!(txb.avail(), TxBuffer::BUFFER_SIZE); + assert_eq!(txb.avail(), SEND_BUFFER_SIZE); // Fill the buffer - assert_eq!( - txb.send(&[1; TxBuffer::BUFFER_SIZE * 2]), - TxBuffer::BUFFER_SIZE - ); + assert_eq!(txb.send(&[1; SEND_BUFFER_SIZE * 2]), SEND_BUFFER_SIZE); assert!(matches!(txb.next_bytes(), - Some((0, x)) if x.len()==TxBuffer::BUFFER_SIZE + Some((0, x)) if x.len()==SEND_BUFFER_SIZE && x.iter().all(|ch| *ch == 1))); // As above - let forty_bytes_from_end = TxBuffer::BUFFER_SIZE as u64 - 40; + let forty_bytes_from_end = SEND_BUFFER_SIZE as u64 - 40; txb.mark_as_acked(0, forty_bytes_from_end as usize); assert!(matches!(txb.next_bytes(), @@ -1021,7 +1015,7 @@ mod tests { && x.iter().all(|ch| *ch == 1))); // Mark a range 'A' in second slice as sent. Should still return the same - let range_a_start = TxBuffer::BUFFER_SIZE as u64 + 30; + let range_a_start = SEND_BUFFER_SIZE as u64 + 30; let range_a_end = range_a_start + 10; txb.mark_as_sent(range_a_start, 10); assert!(matches!(txb.next_bytes(), @@ -1030,7 +1024,7 @@ mod tests { && x.iter().all(|ch| *ch == 1))); // Ack entire first slice and into second slice - let ten_bytes_past_end = TxBuffer::BUFFER_SIZE as u64 + 10; + let ten_bytes_past_end = SEND_BUFFER_SIZE as u64 + 10; txb.mark_as_acked(0, ten_bytes_past_end as usize); // Get up to marked range A @@ -1066,25 +1060,25 @@ mod tests { assert_eq!(s.state.tx_buf().unwrap().data_limit(), 100); // Should hit stream flow control limit before filling up send buffer - let res = s.send(&[4; TxBuffer::BUFFER_SIZE]).unwrap(); + let res = s.send(&[4; SEND_BUFFER_SIZE]).unwrap(); assert_eq!(res, 1024 - 100); // should do nothing, max stream data already 1024 s.set_max_stream_data(1024); - let res = s.send(&[4; TxBuffer::BUFFER_SIZE]).unwrap(); + let res = s.send(&[4; SEND_BUFFER_SIZE]).unwrap(); assert_eq!(res, 0); // should now hit the conn flow control (4096) s.set_max_stream_data(1_048_576); - let res = s.send(&[4; TxBuffer::BUFFER_SIZE]).unwrap(); + let res = s.send(&[4; SEND_BUFFER_SIZE]).unwrap(); assert_eq!(res, 3072); // should now hit the tx buffer size flow_mgr .borrow_mut() - .conn_increase_max_credit(TxBuffer::BUFFER_SIZE as u64); - let res = s.send(&[4; TxBuffer::BUFFER_SIZE + 100]).unwrap(); - assert_eq!(res, TxBuffer::BUFFER_SIZE - 4096); + .conn_increase_max_credit(SEND_BUFFER_SIZE as u64); + let res = s.send(&[4; SEND_BUFFER_SIZE + 100]).unwrap(); + assert_eq!(res, SEND_BUFFER_SIZE - 4096); // TODO(agrover@mozilla.com): test ooo acks somehow s.mark_as_acked(0, 40, false); @@ -1156,11 +1150,11 @@ mod tests { // Unblocking both by a large amount will cause avail() to be limited by // tx buffer size. - assert_eq!(s.avail(), TxBuffer::BUFFER_SIZE - 4); + assert_eq!(s.avail(), SEND_BUFFER_SIZE - 4); assert_eq!( - s.send(&[b'a'; TxBuffer::BUFFER_SIZE]).unwrap(), - TxBuffer::BUFFER_SIZE - 4 + s.send(&[b'a'; SEND_BUFFER_SIZE]).unwrap(), + SEND_BUFFER_SIZE - 4 ); // No event because still blocked by tx buffer full