Skip to content

Commit

Permalink
netbench: fix test timer expiration (#1194)
Browse files Browse the repository at this point in the history
  • Loading branch information
camshaft authored Feb 25, 2022
1 parent 96b0e6e commit 821f5c2
Show file tree
Hide file tree
Showing 18 changed files with 178 additions and 107 deletions.
8 changes: 4 additions & 4 deletions netbench/netbench/src/driver.rs
Original file line number Diff line number Diff line change
Expand Up @@ -64,10 +64,6 @@ impl<'a, C: Connection> Driver<'a, C> {
let mut poll_accept = false;
let mut all_ready = true;

if let Some(target) = timer::Provider::next_expiration(&self) {
all_ready |= timer.poll(target, cx).is_ready();
};

trace.enter(now, 0, 0);
let result = self.local_thread.poll(
&mut self.connection,
Expand Down Expand Up @@ -128,6 +124,10 @@ impl<'a, C: Connection> Driver<'a, C> {
}
}

if let Some(target) = timer::Provider::next_expiration(&self) {
all_ready |= timer.poll(target, cx).is_ready();
};

if all_ready {
self.is_finished = true;
self.connection.poll_finish(cx)
Expand Down
185 changes: 110 additions & 75 deletions netbench/netbench/src/multiplex.rs
Original file line number Diff line number Diff line change
Expand Up @@ -168,10 +168,15 @@ impl<T: AsyncRead + AsyncWrite> Connection<T> {
}
}

fn fill_read_buffer(&mut self, cx: &mut Context) -> Poll<Result<bool>> {
// only read from the socket if it's open and we don't have a pending frame
if !(self.rx_open && self.frame.is_none()) {
return Ok(true).into();
fn fill_read_buffer(&mut self, cx: &mut Context) -> Poll<Result<()>> {
// don't fill the buffer if we have a pending frame
if self.frame.is_some() {
return Poll::Pending;
}

// the socket is closed
if !self.rx_open {
return Ok(()).into();
}

let rx_open = &mut self.rx_open;
Expand All @@ -188,7 +193,7 @@ impl<T: AsyncRead + AsyncWrite> Connection<T> {
Ok(()).into()
}))?;

Ok(false).into()
Ok(()).into()
}

fn dispatch_frame(&mut self, cx: &mut Context) -> Poll<Result<()>> {
Expand Down Expand Up @@ -424,24 +429,36 @@ impl<T: AsyncRead + AsyncWrite> super::Connection for Connection<T> {

self.flush_write_buffer(cx)?;
self.flush_read_buffer(cx)?;
if ready!(self.fill_read_buffer(cx))? {
ready!(self.fill_read_buffer(cx))?;

// the connection is done
if !self.rx_open && self.frame.is_none() {
return Ok(()).into();
}
}
}

fn poll_finish(&mut self, cx: &mut Context) -> Poll<Result<()>> {
loop {
if self.tx_open {
self.flush_write_buffer(cx)?;
self.flush_read_buffer(cx)?;

// wait to shutdown the socket until we have written everything
if !self.write_buf.is_empty() {
return Poll::Pending;
}

// notify the peer we're not writing anything anymore
ready!(self.inner.as_mut().poll_shutdown(cx))?;
self.tx_open = false;
}

loop {
// work to read all of the remaining data
self.flush_read_buffer(cx)?;
ready!(self.fill_read_buffer(cx))?;

if ready!(self.fill_read_buffer(cx))? {
// the connection is done
if !self.rx_open && self.frame.is_none() {
return Ok(()).into();
}
}
Expand Down Expand Up @@ -471,6 +488,76 @@ mod tests {
use insta::assert_display_snapshot;
use std::collections::HashSet;

fn test(config: Config, scenario: &Scenario) -> (MemoryLogger, MemoryLogger) {
let traces = &scenario.traces;

let (client, server) = testing::Connection::pair(10000);

let mut client = {
let scenario = &scenario.clients[0].connections[0];
let conn = Box::pin(client);
let conn = super::Connection::new(conn, config.clone());
Driver::new(scenario, conn)
};
let mut client_trace = MemoryLogger::new(0, traces);
let mut client_checkpoints = HashSet::new();
let mut client_timer = timer::Testing::default();

let mut server = {
let scenario = &scenario.servers[0].connections[0];
let conn = Box::pin(server);
let conn = super::Connection::new(conn, config);
Driver::new(scenario, conn)
};
let mut server_trace = MemoryLogger::new(1, traces);
let mut server_checkpoints = HashSet::new();
let mut server_timer = timer::Testing::default();

let (waker, count) = new_count_waker();
let mut prev_count = 0;
let mut cx = core::task::Context::from_waker(&waker);

loop {
let c = client.poll(
&mut client_trace,
&mut client_checkpoints,
&mut client_timer,
&mut cx,
);
let s = server.poll(
&mut server_trace,
&mut server_checkpoints,
&mut server_timer,
&mut cx,
);

match (c, s) {
(Poll::Ready(Ok(())), Poll::Ready(Ok(()))) => break,
(Poll::Ready(Err(e)), _) | (_, Poll::Ready(Err(e))) => panic!("{}", e),
_ => {
let current_count = count.get();
if current_count > prev_count {
prev_count = current_count;
continue;
}

if client_timer.advance_pair(&mut server_timer).is_none() {
eprintln!("the timer did not advance!");
eprintln!("server trace:");
eprintln!("{}", server_trace.as_str().unwrap());
eprintln!("{:#?}", server);
eprintln!("client trace:");
eprintln!("{}", client_trace.as_str().unwrap());
eprintln!("{:#?}", client);
panic!("test is deadlocked");
}
}
}
}

(client_trace, server_trace)
}

macro_rules! test {
($name:ident, $config:expr, $builder:expr) => {
#[test]
Expand All @@ -482,72 +569,8 @@ mod tests {
client.connect_to(server, $builder);
});
});
let traces = &scenario.traces;

let (client, server) = testing::Connection::pair(10000);

let mut client = {
let scenario = &scenario.clients[0].connections[0];
let conn = Box::pin(client);
let conn = super::Connection::new(conn, $config);
Driver::new(scenario, conn)
};
let mut client_trace = MemoryLogger::new(0, traces);
let mut client_checkpoints = HashSet::new();
let mut client_timer = timer::Testing::default();

let mut server = {
let scenario = &scenario.servers[0].connections[0];
let conn = Box::pin(server);
let conn = super::Connection::new(conn, $config);
Driver::new(scenario, conn)
};
let mut server_trace = MemoryLogger::new(1, traces);
let mut server_checkpoints = HashSet::new();
let mut server_timer = timer::Testing::default();

let (waker, count) = new_count_waker();
let mut prev_count = 0;
let mut cx = core::task::Context::from_waker(&waker);

loop {
let c = client.poll(
&mut client_trace,
&mut client_checkpoints,
&mut client_timer,
&mut cx,
);
let s = server.poll(
&mut server_trace,
&mut server_checkpoints,
&mut server_timer,
&mut cx,
);

match (c, s) {
(Poll::Ready(Ok(())), Poll::Ready(Ok(()))) => break,
(Poll::Ready(Err(e)), _) => return Err(e.into()),
(_, Poll::Ready(Err(e))) => return Err(e.into()),
_ => {
let current_count = count.get();
if current_count > prev_count {
prev_count = current_count;
continue;
}

if client_timer.advance_pair(&mut server_timer).is_none() {
eprintln!("the timer did not advance!");
eprintln!("server trace:");
eprintln!("{}", server_trace.as_str().unwrap());
eprintln!("{:#?}", server);
eprintln!("client trace:");
eprintln!("{}", client_trace.as_str().unwrap());
eprintln!("{:#?}", client);
panic!("test is deadlocked");
}
}
}
}
let (client_trace, server_trace) = test($config, &scenario);

assert_display_snapshot!(
concat!(stringify!($name), "__client"),
Expand All @@ -574,7 +597,7 @@ mod tests {
);
});

test!(single_slow_stream, Config::default(), |conn| {
test!(single_slow_send_stream, Config::default(), |conn| {
conn.open_send_stream(
|local| {
local.set_send_rate(100.bytes() / 50.millis());
Expand All @@ -586,6 +609,18 @@ mod tests {
);
});

test!(single_slow_recv_stream, Config::default(), |conn| {
conn.open_send_stream(
|local| {
local.send(1.kilobytes());
},
|remote| {
remote.set_receive_rate(100.bytes() / 50.millis());
remote.receive(1.kilobytes());
},
);
});

test!(
low_stream_window,
Config {
Expand Down
5 changes: 4 additions & 1 deletion netbench/netbench/src/multiplex/stream.rs
Original file line number Diff line number Diff line change
Expand Up @@ -137,6 +137,8 @@ pub struct Controller {

impl Controller {
pub fn new(local_window: u64, peer_window: u64) -> Self {
let local_window = local_window.max(1);
let peer_window = peer_window.max(1);
Self {
open_offset: 0,
peer_window_offset: local_window,
Expand Down Expand Up @@ -165,7 +167,8 @@ impl Controller {
}

pub fn transmit(&mut self) -> Option<u64> {
if self.local_window_offset != self.transmitted_window_offset {
// send a max streams if we are using more than half capacity
if self.local_window_offset >= self.transmitted_window_offset * 2 {
self.transmitted_window_offset = self.local_window_offset;
Some(self.local_window_offset)
} else {
Expand Down
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
---
source: netbench/src/multiplex.rs
assertion_line: 635
expression: (client_trace.1).as_str().unwrap()
assertion_line: 685
expression: client_trace.as_str().unwrap()

---
0:00:00.000001: 0:0.0:exec: Scope { threads: [[OpenSendStream { stream_id: 0 }, SendRate { stream_id: 0, rate: Rate { bytes: Byte(100), period: 50ms } }, Send { stream_id: 0, bytes: Byte(1000) }, Unpark { checkpoint: 0 }, Send { stream_id: 0, bytes: Byte(1000) }, SendFinish { stream_id: 0 }], [OpenSendStream { stream_id: 1 }, Park { checkpoint: 0 }, SendRate { stream_id: 1, rate: Rate { bytes: Byte(100), period: 50ms } }, Send { stream_id: 1, bytes: Byte(1000) }, SendFinish { stream_id: 1 }]] }
Expand Down
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
---
source: netbench/src/multiplex.rs
assertion_line: 637
expression: (server_trace.1).as_str().unwrap()
assertion_line: 685
expression: server_trace.as_str().unwrap()

---
0:00:00.000001: 1:accept: stream=0
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
---
source: netbench/src/multiplex.rs
assertion_line: 598
assertion_line: 641
expression: client_trace.as_str().unwrap()

---
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
---
source: netbench/src/multiplex.rs
assertion_line: 598
assertion_line: 641
expression: server_trace.as_str().unwrap()

---
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
---
source: netbench/src/multiplex.rs
assertion_line: 596
assertion_line: 622
expression: client_trace.as_str().unwrap()

---
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
---
source: netbench/src/multiplex.rs
assertion_line: 596
assertion_line: 622
expression: server_trace.as_str().unwrap()

---
Expand Down
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
---
source: netbench/src/multiplex.rs
assertion_line: 617
expression: (client_trace.1).as_str().unwrap()
assertion_line: 667
expression: client_trace.as_str().unwrap()

---
0:00:00.000001: 0:0.0:exec: Scope { threads: [[OpenSendStream { stream_id: 0 }, SendRate { stream_id: 0, rate: Rate { bytes: Byte(100), period: 50ms } }, Send { stream_id: 0, bytes: Byte(1000) }, SendFinish { stream_id: 0 }], [OpenSendStream { stream_id: 1 }, SendRate { stream_id: 1, rate: Rate { bytes: Byte(100), period: 50ms } }, Send { stream_id: 1, bytes: Byte(1000) }, SendFinish { stream_id: 1 }], [OpenSendStream { stream_id: 2 }, SendRate { stream_id: 2, rate: Rate { bytes: Byte(100), period: 50ms } }, Send { stream_id: 2, bytes: Byte(1000) }, SendFinish { stream_id: 2 }]] }
Expand Down
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
---
source: netbench/src/multiplex.rs
assertion_line: 619
expression: (server_trace.1).as_str().unwrap()
assertion_line: 667
expression: server_trace.as_str().unwrap()

---
0:00:00.000001: 1:accept: stream=0
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
---
source: netbench/src/multiplex.rs
assertion_line: 610
expression: client_trace.as_str().unwrap()

---
0:00:00.000001: 0:0.0:exec: OpenSendStream { stream_id: 0 }
0:00:00.000001: 0:0.0:open: stream=0
0:00:00.000001: 0:0.0:exec: Send { stream_id: 0, bytes: Byte(1000) }
0:00:00.000001: 0:0.0:send: stream=0, len=1000
0:00:00.000001: 0:0.0:exec: SendFinish { stream_id: 0 }
0:00:00.000001: 0:0.0:send finish: stream=0

Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
---
source: netbench/src/multiplex.rs
assertion_line: 610
expression: server_trace.as_str().unwrap()

---
0:00:00.000001: 1:accept: stream=0
0:00:00.000001: 1:1.0:exec: ReceiveRate { stream_id: 0, rate: Rate { bytes: Byte(100), period: 50ms } }
0:00:00.000001: 1:1.0:exec: Receive { stream_id: 0, bytes: Byte(1000) }
0:00:00.000001: 1:1.0:recv: stream=0, len=100
0:00:00.050001: 1:1.0:recv: stream=0, len=100
0:00:00.100001: 1:1.0:recv: stream=0, len=100
0:00:00.150001: 1:1.0:recv: stream=0, len=100
0:00:00.200001: 1:1.0:recv: stream=0, len=100
0:00:00.250001: 1:1.0:recv: stream=0, len=100
0:00:00.300001: 1:1.0:recv: stream=0, len=100
0:00:00.350001: 1:1.0:recv: stream=0, len=100
0:00:00.400001: 1:1.0:recv: stream=0, len=100
0:00:00.450001: 1:1.0:recv: stream=0, len=100
0:00:00.450001: 1:1.0:exec: ReceiveFinish { stream_id: 0 }
0:00:00.450001: 1:1.0:recv finish: stream=0

Loading

0 comments on commit 821f5c2

Please sign in to comment.