diff --git a/neqo-bin/src/client/http09.rs b/neqo-bin/src/client/http09.rs index 9bdb6dca85..7d30017bb4 100644 --- a/neqo-bin/src/client/http09.rs +++ b/neqo-bin/src/client/http09.rs @@ -138,8 +138,12 @@ pub(crate) fn create_client( } impl super::Client for Connection { - fn process(&mut self, dgram: Option<&Datagram>, now: Instant) -> Output { - self.process(dgram, now) + fn process_output(&mut self, now: Instant) -> Output { + self.process_output(now) + } + + fn process_input(&mut self, dgram: &Datagram, now: Instant) { + self.process_input(dgram, now); } fn close(&mut self, now: Instant, app_error: neqo_transport::AppError, msg: S) diff --git a/neqo-bin/src/client/http3.rs b/neqo-bin/src/client/http3.rs index c88a8448f6..486992c51d 100644 --- a/neqo-bin/src/client/http3.rs +++ b/neqo-bin/src/client/http3.rs @@ -115,8 +115,12 @@ impl super::Client for Http3Client { matches!(self.state(), Http3State::Closed(..)) } - fn process(&mut self, dgram: Option<&Datagram>, now: Instant) -> Output { - self.process(dgram, now) + fn process_output(&mut self, now: Instant) -> Output { + self.process_output(now) + } + + fn process_input(&mut self, dgram: &Datagram, now: Instant) { + self.process_input(dgram, now); } fn close(&mut self, now: Instant, app_error: AppError, msg: S) diff --git a/neqo-bin/src/client/mod.rs b/neqo-bin/src/client/mod.rs index 81721802e1..7cb7e3c9ef 100644 --- a/neqo-bin/src/client/mod.rs +++ b/neqo-bin/src/client/mod.rs @@ -367,7 +367,8 @@ trait Handler { /// Network client, e.g. [`neqo_transport::Connection`] or [`neqo_http3::Http3Client`]. trait Client { - fn process(&mut self, dgram: Option<&Datagram>, now: Instant) -> Output; + fn process_output(&mut self, now: Instant) -> Output; + fn process_input(&mut self, dgram: &Datagram, now: Instant); fn close(&mut self, now: Instant, app_error: AppError, msg: S) where S: AsRef + Display; @@ -404,7 +405,7 @@ impl<'a, H: Handler> Runner<'a, H> { } } - self.process(None).await?; + self.process_output().await?; if self.client.is_closed() { if self.args.stats { @@ -414,16 +415,7 @@ impl<'a, H: Handler> Runner<'a, H> { } match ready(self.socket, self.timeout.as_mut()).await? { - Ready::Socket => loop { - let dgrams = self.socket.recv(&self.local_addr)?; - if dgrams.is_empty() { - break; - } - for dgram in &dgrams { - self.process(Some(dgram)).await?; - } - self.handler.maybe_key_update(&mut self.client)?; - }, + Ready::Socket => self.process_input().await?, Ready::Timeout => { self.timeout = None; } @@ -431,9 +423,9 @@ impl<'a, H: Handler> Runner<'a, H> { } } - async fn process(&mut self, mut dgram: Option<&Datagram>) -> Result<(), io::Error> { + async fn process_output(&mut self) -> Result<(), io::Error> { loop { - match self.client.process(dgram.take(), Instant::now()) { + match self.client.process_output(Instant::now()) { Output::Datagram(dgram) => { self.socket.writable().await?; self.socket.send(dgram)?; @@ -452,6 +444,21 @@ impl<'a, H: Handler> Runner<'a, H> { Ok(()) } + + async fn process_input(&mut self) -> Res<()> { + loop { + let dgrams = self.socket.recv(&self.local_addr)?; + if dgrams.is_empty() { + break; + } + for dgram in &dgrams { + self.client.process_input(dgram, Instant::now()); + } + self.handler.maybe_key_update(&mut self.client)?; + } + + Ok(()) + } } fn qlog_new(args: &Args, hostname: &str, cid: &ConnectionId) -> Res {