From 1b6079cd39f79835b21ac17b1d09052759a138d5 Mon Sep 17 00:00:00 2001 From: Andy Grover Date: Fri, 10 Jul 2020 13:19:38 -0700 Subject: [PATCH] Export constants for send and recv buffer sizes Followup to #806, this is simpler than const methods on Connection. Note that while we can change these constants now, setting them to different values will cause some tests to fail. Spending time to fix tests for different const values is not justified yet. --- neqo-http3/src/connection_client.rs | 27 ++++++------ neqo-transport/src/connection.rs | 35 +++++++--------- neqo-transport/src/lib.rs | 2 + neqo-transport/src/recv_stream.rs | 5 ++- neqo-transport/src/send_stream.rs | 64 +++++++++++++---------------- 5 files changed, 62 insertions(+), 71 deletions(-) diff --git a/neqo-http3/src/connection_client.rs b/neqo-http3/src/connection_client.rs index c595a08c9d..cf2b952da3 100644 --- a/neqo-http3/src/connection_client.rs +++ b/neqo-http3/src/connection_client.rs @@ -695,6 +695,7 @@ mod tests { use neqo_qpack::encoder::QPackEncoder; use neqo_transport::{ CloseError, ConnectionEvent, FixedConnectionIdManager, QuicVersion, State, + RECV_BUFFER_SIZE, SEND_BUFFER_SIZE, }; use test_fixture::{ default_server, fixture_init, loopback, now, DEFAULT_ALPN, DEFAULT_SERVER_NAME, @@ -1886,7 +1887,7 @@ mod tests { if let ConnectionEvent::RecvStreamReadable { stream_id } = e { if stream_id == request_stream_id { // Read the DATA frame. - let mut buf = vec![1_u8; Connection::stream_recv_buffer_size()]; + let mut buf = vec![1_u8; RECV_BUFFER_SIZE]; let (amount, fin) = server.conn.stream_recv(stream_id, &mut buf).unwrap(); assert_eq!(fin, true); assert_eq!( @@ -1940,6 +1941,7 @@ mod tests { // Send 2 data frames so that the second one cannot fit into the send_buf and it is only // partialy sent. We check that the sent data is correct. + #[allow(clippy::useless_vec)] fn fetch_with_two_data_frames( first_frame: &[u8], expected_first_data_frame_header: &[u8], @@ -1958,10 +1960,7 @@ mod tests { assert_eq!(sent, Ok(first_frame.len())); // The second frame cannot fit. - let sent = client.send_request_body( - request_stream_id, - &vec![0_u8; Connection::stream_recv_buffer_size()], - ); + let sent = client.send_request_body(request_stream_id, &vec![0_u8; SEND_BUFFER_SIZE]); assert_eq!(sent, Ok(expected_second_data_frame.len())); // Close stream. @@ -1970,7 +1969,7 @@ mod tests { let mut out = client.process(None, now()); // We need to loop a bit until all data has been sent. Once for every 1K // of data. - for _i in 0..Connection::stream_send_buffer_size() / 1000 { + for _i in 0..SEND_BUFFER_SIZE / 1000 { out = server.conn.process(out.dgram(), now()); out = client.process(out.dgram(), now()); } @@ -1980,7 +1979,7 @@ mod tests { if let ConnectionEvent::RecvStreamReadable { stream_id } = e { if stream_id == request_stream_id { // Read DATA frames. - let mut buf = vec![1_u8; Connection::stream_recv_buffer_size()]; + let mut buf = vec![1_u8; RECV_BUFFER_SIZE]; let (amount, fin) = server.conn.stream_recv(stream_id, &mut buf).unwrap(); assert_eq!(fin, true); assert_eq!( @@ -2033,7 +2032,7 @@ mod tests { // After the first frame there is exactly 63+2 bytes left in the send buffer. #[test] fn fetch_two_data_frame_second_63bytes() { - let (buf, hdr) = alloc_buffer(Connection::stream_send_buffer_size() - 88); + let (buf, hdr) = alloc_buffer(SEND_BUFFER_SIZE - 88); fetch_with_two_data_frames(&buf, &hdr, &[0x0, 0x3f], &[0_u8; 63]); } @@ -2042,7 +2041,7 @@ mod tests { // but we can only send 63 bytes. #[test] fn fetch_two_data_frame_second_63bytes_place_for_66() { - let (buf, hdr) = alloc_buffer(Connection::stream_send_buffer_size() - 89); + let (buf, hdr) = alloc_buffer(SEND_BUFFER_SIZE - 89); fetch_with_two_data_frames(&buf, &hdr, &[0x0, 0x3f], &[0_u8; 63]); } @@ -2051,7 +2050,7 @@ mod tests { // but we can only send 64 bytes. #[test] fn fetch_two_data_frame_second_64bytes_place_for_67() { - let (buf, hdr) = alloc_buffer(Connection::stream_send_buffer_size() - 90); + let (buf, hdr) = alloc_buffer(SEND_BUFFER_SIZE - 90); fetch_with_two_data_frames(&buf, &hdr, &[0x0, 0x40, 0x40], &[0_u8; 64]); } @@ -2059,7 +2058,7 @@ mod tests { // After the first frame there is exactly 16383+3 bytes left in the send buffer. #[test] fn fetch_two_data_frame_second_16383bytes() { - let (buf, hdr) = alloc_buffer(Connection::stream_send_buffer_size() - 16409); + let (buf, hdr) = alloc_buffer(SEND_BUFFER_SIZE - 16409); fetch_with_two_data_frames(&buf, &hdr, &[0x0, 0x7f, 0xff], &[0_u8; 16383]); } @@ -2067,7 +2066,7 @@ mod tests { // After the first frame there is exactly 16383+4 bytes left in the send buffer, but we can only send 16383 bytes. #[test] fn fetch_two_data_frame_second_16383bytes_place_for_16387() { - let (buf, hdr) = alloc_buffer(Connection::stream_send_buffer_size() - 16410); + let (buf, hdr) = alloc_buffer(SEND_BUFFER_SIZE - 16410); fetch_with_two_data_frames(&buf, &hdr, &[0x0, 0x7f, 0xff], &[0_u8; 16383]); } @@ -2075,7 +2074,7 @@ mod tests { // After the first frame there is exactly 16383+5 bytes left in the send buffer, but we can only send 16383 bytes. #[test] fn fetch_two_data_frame_second_16383bytes_place_for_16388() { - let (buf, hdr) = alloc_buffer(Connection::stream_send_buffer_size() - 16411); + let (buf, hdr) = alloc_buffer(SEND_BUFFER_SIZE - 16411); fetch_with_two_data_frames(&buf, &hdr, &[0x0, 0x7f, 0xff], &[0_u8; 16383]); } @@ -2083,7 +2082,7 @@ mod tests { // After the first frame there is exactly 16384+5 bytes left in the send buffer, but we can send 16384 bytes. #[test] fn fetch_two_data_frame_second_16384bytes_place_for_16389() { - let (buf, hdr) = alloc_buffer(Connection::stream_send_buffer_size() - 16412); + let (buf, hdr) = alloc_buffer(SEND_BUFFER_SIZE - 16412); fetch_with_two_data_frames(&buf, &hdr, &[0x0, 0x80, 0x0, 0x40, 0x0], &[0_u8; 16384]); } diff --git a/neqo-transport/src/connection.rs b/neqo-transport/src/connection.rs index f13cd35dec..ab8581e4c0 100644 --- a/neqo-transport/src/connection.rs +++ b/neqo-transport/src/connection.rs @@ -43,8 +43,8 @@ use crate::packet::{ use crate::path::Path; use crate::qlog; use crate::recovery::{LossRecovery, RecoveryToken, SendProfile, GRANULARITY}; -use crate::recv_stream::{RecvStream, RecvStreams, RX_STREAM_DATA_WINDOW}; -use crate::send_stream::{SendStream, SendStreams, TxBuffer}; +use crate::recv_stream::{RecvStream, RecvStreams, RECV_BUFFER_SIZE}; +use crate::send_stream::{SendStream, SendStreams}; use crate::stats::Stats; use crate::stream_id::{StreamId, StreamIndex, StreamIndexes}; use crate::tparams::{ @@ -526,13 +526,16 @@ impl Connection { fn set_tp_defaults(tps: &mut TransportParameters) { tps.set_integer( tparams::INITIAL_MAX_STREAM_DATA_BIDI_LOCAL, - RX_STREAM_DATA_WINDOW, + u64::try_from(RECV_BUFFER_SIZE).unwrap(), ); tps.set_integer( tparams::INITIAL_MAX_STREAM_DATA_BIDI_REMOTE, - RX_STREAM_DATA_WINDOW, + u64::try_from(RECV_BUFFER_SIZE).unwrap(), + ); + tps.set_integer( + tparams::INITIAL_MAX_STREAM_DATA_UNI, + u64::try_from(RECV_BUFFER_SIZE).unwrap(), ); - tps.set_integer(tparams::INITIAL_MAX_STREAM_DATA_UNI, RX_STREAM_DATA_WINDOW); tps.set_integer(tparams::INITIAL_MAX_STREAMS_BIDI, LOCAL_STREAM_LIMIT_BIDI); tps.set_integer(tparams::INITIAL_MAX_STREAMS_UNI, LOCAL_STREAM_LIMIT_UNI); tps.set_integer(tparams::INITIAL_MAX_DATA, LOCAL_MAX_DATA); @@ -2588,16 +2591,6 @@ impl Connection { Ok(()) } - /// The per-stream send buffer size. - pub const fn stream_send_buffer_size() -> usize { - TxBuffer::BUFFER_SIZE - } - - /// The per-stream receive max data size. - pub const fn stream_recv_buffer_size() -> usize { - RX_STREAM_DATA_WINDOW as usize - } - /// Get all current events. Best used just in debug/testing code, use /// next_event() instead. pub fn events(&mut self) -> impl Iterator { @@ -2633,7 +2626,7 @@ mod tests { use crate::path::PATH_MTU_V6; use crate::recovery::ACK_ONLY_SIZE_LIMIT; use crate::recovery::PTO_PACKET_COUNT; - use crate::send_stream::TxBuffer; + use crate::send_stream::SEND_BUFFER_SIZE; use crate::tracking::{ACK_DELAY, MAX_UNACKED_PKTS}; use std::convert::TryInto; @@ -3591,7 +3584,7 @@ mod tests { ); assert_eq!( client - .stream_send(stream_id, &[b'a'; RX_STREAM_DATA_WINDOW as usize]) + .stream_send(stream_id, &[b'a'; RECV_BUFFER_SIZE]) .unwrap(), SMALL_MAX_DATA ); @@ -3619,7 +3612,7 @@ mod tests { client.handle_max_data(100_000_000); assert_eq!( client.stream_avail_send_space(stream_id).unwrap(), - TxBuffer::BUFFER_SIZE - SMALL_MAX_DATA + SEND_BUFFER_SIZE - SMALL_MAX_DATA ); // Increase max stream data. Avail space now limited by tx buffer @@ -3630,7 +3623,7 @@ mod tests { .set_max_stream_data(100_000_000); assert_eq!( client.stream_avail_send_space(stream_id).unwrap(), - TxBuffer::BUFFER_SIZE - SMALL_MAX_DATA + 4096 + SEND_BUFFER_SIZE - SMALL_MAX_DATA + 4096 ); let evts = client.events().collect::>(); @@ -5786,7 +5779,7 @@ mod tests { server .flow_mgr .borrow_mut() - .stream_data_blocked(3.into(), RX_STREAM_DATA_WINDOW * 4); + .stream_data_blocked(3.into(), RECV_BUFFER_SIZE as u64 * 4); let out = server.process(None, now); assert!(out.as_dgram_ref().is_some()); @@ -5804,7 +5797,7 @@ mod tests { // window value. assert!(frames.iter().any( |(f, _)| matches!(f, Frame::MaxStreamData { maximum_stream_data, .. } - if *maximum_stream_data == RX_STREAM_DATA_WINDOW) + if *maximum_stream_data == RECV_BUFFER_SIZE as u64) )); } } diff --git a/neqo-transport/src/lib.rs b/neqo-transport/src/lib.rs index 21e8c345c4..8b373cab7d 100644 --- a/neqo-transport/src/lib.rs +++ b/neqo-transport/src/lib.rs @@ -39,6 +39,8 @@ pub use self::packet::QuicVersion; pub use self::stream_id::StreamId; const LOCAL_IDLE_TIMEOUT: std::time::Duration = std::time::Duration::from_secs(30); // 30 second +pub use self::recv_stream::RECV_BUFFER_SIZE; +pub use self::send_stream::SEND_BUFFER_SIZE; type TransportError = u64; const ERROR_APPLICATION_CLOSE: TransportError = 12; diff --git a/neqo-transport/src/recv_stream.rs b/neqo-transport/src/recv_stream.rs index a130f84791..0a208aa5b3 100644 --- a/neqo-transport/src/recv_stream.rs +++ b/neqo-transport/src/recv_stream.rs @@ -23,7 +23,10 @@ use crate::stream_id::StreamId; use crate::{AppError, Error, Res}; use neqo_common::qtrace; -pub const RX_STREAM_DATA_WINDOW: u64 = 0x10_0000; // 1MiB +const RX_STREAM_DATA_WINDOW: u64 = 0x10_0000; // 1MiB + +// Export as usize for consistency with SEND_BUFFER_SIZE +pub const RECV_BUFFER_SIZE: usize = RX_STREAM_DATA_WINDOW as usize; pub(crate) type RecvStreams = BTreeMap; diff --git a/neqo-transport/src/send_stream.rs b/neqo-transport/src/send_stream.rs index 99a3d3f807..0a6ddceed3 100644 --- a/neqo-transport/src/send_stream.rs +++ b/neqo-transport/src/send_stream.rs @@ -25,6 +25,8 @@ use crate::stream_id::StreamId; use crate::tracking::PNSpace; use crate::{AppError, Error, Res}; +pub const SEND_BUFFER_SIZE: usize = 0x10_0000; // 1 MiB + #[derive(Debug, PartialEq, Clone, Copy)] enum RangeState { Sent, @@ -280,21 +282,19 @@ pub struct TxBuffer { } impl TxBuffer { - pub const BUFFER_SIZE: usize = 0x10_0000; // 1 MiB - pub fn new() -> Self { Self { - send_buf: VecDeque::with_capacity(Self::BUFFER_SIZE), + send_buf: VecDeque::with_capacity(SEND_BUFFER_SIZE), ..Self::default() } } /// Attempt to add some or all of the passed-in buffer to the TxBuffer. pub fn send(&mut self, buf: &[u8]) -> usize { - let can_buffer = min(Self::BUFFER_SIZE - self.buffered(), buf.len()); + let can_buffer = min(SEND_BUFFER_SIZE - self.buffered(), buf.len()); if can_buffer > 0 { self.send_buf.extend(&buf[..can_buffer]); - assert!(self.send_buf.len() <= Self::BUFFER_SIZE); + assert!(self.send_buf.len() <= SEND_BUFFER_SIZE); } can_buffer } @@ -364,7 +364,7 @@ impl TxBuffer { } fn avail(&self) -> usize { - Self::BUFFER_SIZE - self.buffered() + SEND_BUFFER_SIZE - self.buffered() } pub fn highest_sent(&self) -> u64 { @@ -409,7 +409,7 @@ impl SendStreamState { fn tx_avail(&self) -> u64 { match self { // In Ready, TxBuffer not yet allocated but size is known - Self::Ready => TxBuffer::BUFFER_SIZE.try_into().unwrap(), + Self::Ready => SEND_BUFFER_SIZE.try_into().unwrap(), Self::Send { send_buf } | Self::DataSent { send_buf, .. } => { send_buf.avail().try_into().unwrap() } @@ -924,19 +924,16 @@ mod tests { fn tx_buffer_next_bytes_1() { let mut txb = TxBuffer::new(); - assert_eq!(txb.avail(), TxBuffer::BUFFER_SIZE); + assert_eq!(txb.avail(), SEND_BUFFER_SIZE); // Fill the buffer - assert_eq!( - txb.send(&[1; TxBuffer::BUFFER_SIZE * 2]), - TxBuffer::BUFFER_SIZE - ); + assert_eq!(txb.send(&[1; SEND_BUFFER_SIZE * 2]), SEND_BUFFER_SIZE); assert!(matches!(txb.next_bytes(), - Some((0, x)) if x.len()==TxBuffer::BUFFER_SIZE + Some((0, x)) if x.len()==SEND_BUFFER_SIZE && x.iter().all(|ch| *ch == 1))); // Mark almost all as sent. Get what's left - let one_byte_from_end = TxBuffer::BUFFER_SIZE as u64 - 1; + let one_byte_from_end = SEND_BUFFER_SIZE as u64 - 1; txb.mark_as_sent(0, one_byte_from_end as usize); assert!(matches!(txb.next_bytes(), Some((start, x)) if x.len() == 1 @@ -944,7 +941,7 @@ mod tests { && x.iter().all(|ch| *ch == 1))); // Mark all as sent. Get nothing - txb.mark_as_sent(0, TxBuffer::BUFFER_SIZE); + txb.mark_as_sent(0, SEND_BUFFER_SIZE); assert!(matches!(txb.next_bytes(), None)); // Mark as lost. Get it again @@ -956,7 +953,7 @@ mod tests { // Mark a larger range lost, including beyond what's in the buffer even. // Get a little more - let five_bytes_from_end = TxBuffer::BUFFER_SIZE as u64 - 5; + let five_bytes_from_end = SEND_BUFFER_SIZE as u64 - 5; txb.mark_as_lost(five_bytes_from_end, 100); assert!(matches!(txb.next_bytes(), Some((start, x)) if x.len() == 5 @@ -981,7 +978,7 @@ mod tests { txb.mark_as_sent(five_bytes_from_end, 5); assert!(matches!(txb.next_bytes(), Some((start, x)) if x.len() == 30 - && start == TxBuffer::BUFFER_SIZE as u64 + && start == SEND_BUFFER_SIZE as u64 && x.iter().all(|ch| *ch == 2))); } @@ -989,19 +986,16 @@ mod tests { fn tx_buffer_next_bytes_2() { let mut txb = TxBuffer::new(); - assert_eq!(txb.avail(), TxBuffer::BUFFER_SIZE); + assert_eq!(txb.avail(), SEND_BUFFER_SIZE); // Fill the buffer - assert_eq!( - txb.send(&[1; TxBuffer::BUFFER_SIZE * 2]), - TxBuffer::BUFFER_SIZE - ); + assert_eq!(txb.send(&[1; SEND_BUFFER_SIZE * 2]), SEND_BUFFER_SIZE); assert!(matches!(txb.next_bytes(), - Some((0, x)) if x.len()==TxBuffer::BUFFER_SIZE + Some((0, x)) if x.len()==SEND_BUFFER_SIZE && x.iter().all(|ch| *ch == 1))); // As above - let forty_bytes_from_end = TxBuffer::BUFFER_SIZE as u64 - 40; + let forty_bytes_from_end = SEND_BUFFER_SIZE as u64 - 40; txb.mark_as_acked(0, forty_bytes_from_end as usize); assert!(matches!(txb.next_bytes(), @@ -1021,7 +1015,7 @@ mod tests { && x.iter().all(|ch| *ch == 1))); // Mark a range 'A' in second slice as sent. Should still return the same - let range_a_start = TxBuffer::BUFFER_SIZE as u64 + 30; + let range_a_start = SEND_BUFFER_SIZE as u64 + 30; let range_a_end = range_a_start + 10; txb.mark_as_sent(range_a_start, 10); assert!(matches!(txb.next_bytes(), @@ -1030,7 +1024,7 @@ mod tests { && x.iter().all(|ch| *ch == 1))); // Ack entire first slice and into second slice - let ten_bytes_past_end = TxBuffer::BUFFER_SIZE as u64 + 10; + let ten_bytes_past_end = SEND_BUFFER_SIZE as u64 + 10; txb.mark_as_acked(0, ten_bytes_past_end as usize); // Get up to marked range A @@ -1066,25 +1060,25 @@ mod tests { assert_eq!(s.state.tx_buf().unwrap().data_limit(), 100); // Should hit stream flow control limit before filling up send buffer - let res = s.send(&[4; TxBuffer::BUFFER_SIZE]).unwrap(); + let res = s.send(&[4; SEND_BUFFER_SIZE]).unwrap(); assert_eq!(res, 1024 - 100); // should do nothing, max stream data already 1024 s.set_max_stream_data(1024); - let res = s.send(&[4; TxBuffer::BUFFER_SIZE]).unwrap(); + let res = s.send(&[4; SEND_BUFFER_SIZE]).unwrap(); assert_eq!(res, 0); // should now hit the conn flow control (4096) s.set_max_stream_data(1_048_576); - let res = s.send(&[4; TxBuffer::BUFFER_SIZE]).unwrap(); + let res = s.send(&[4; SEND_BUFFER_SIZE]).unwrap(); assert_eq!(res, 3072); // should now hit the tx buffer size flow_mgr .borrow_mut() - .conn_increase_max_credit(TxBuffer::BUFFER_SIZE as u64); - let res = s.send(&[4; TxBuffer::BUFFER_SIZE + 100]).unwrap(); - assert_eq!(res, TxBuffer::BUFFER_SIZE - 4096); + .conn_increase_max_credit(SEND_BUFFER_SIZE as u64); + let res = s.send(&[4; SEND_BUFFER_SIZE + 100]).unwrap(); + assert_eq!(res, SEND_BUFFER_SIZE - 4096); // TODO(agrover@mozilla.com): test ooo acks somehow s.mark_as_acked(0, 40, false); @@ -1156,11 +1150,11 @@ mod tests { // Unblocking both by a large amount will cause avail() to be limited by // tx buffer size. - assert_eq!(s.avail(), TxBuffer::BUFFER_SIZE - 4); + assert_eq!(s.avail(), SEND_BUFFER_SIZE - 4); assert_eq!( - s.send(&[b'a'; TxBuffer::BUFFER_SIZE]).unwrap(), - TxBuffer::BUFFER_SIZE - 4 + s.send(&[b'a'; SEND_BUFFER_SIZE]).unwrap(), + SEND_BUFFER_SIZE - 4 ); // No event because still blocked by tx buffer full