From 7aa964189a9c8114ae1d79e28570e7a8c40e2fde Mon Sep 17 00:00:00 2001 From: Kershaw Chang Date: Thu, 26 Sep 2024 16:14:38 +0200 Subject: [PATCH 1/3] make process_output be able to return keep_alive timeout --- neqo-http3/src/connection_client.rs | 4 +- neqo-transport/src/connection/idle.rs | 15 +++++++ neqo-transport/src/connection/mod.rs | 11 ++++- neqo-transport/src/connection/tests/idle.rs | 45 ++++++++++++--------- 4 files changed, 52 insertions(+), 23 deletions(-) diff --git a/neqo-http3/src/connection_client.rs b/neqo-http3/src/connection_client.rs index 25840e91a6..1d5f6f7329 100644 --- a/neqo-http3/src/connection_client.rs +++ b/neqo-http3/src/connection_client.rs @@ -2620,7 +2620,7 @@ mod tests { force_idle(&mut client, &mut server); let idle_timeout = ConnectionParameters::default().get_idle_timeout(); - assert_eq!(client.process_output(now()).callback(), idle_timeout); + assert_eq!(client.process_output(now()).callback(), idle_timeout / 2); } // Helper function: read response when a server sends HTTP_RESPONSE_2. @@ -5114,7 +5114,7 @@ mod tests { assert!(!fin); force_idle(&mut client, &mut server); - assert_eq!(client.process_output(now()).callback(), idle_timeout); + assert_eq!(client.process_output(now()).callback(), idle_timeout / 2); } #[test] diff --git a/neqo-transport/src/connection/idle.rs b/neqo-transport/src/connection/idle.rs index 5aaf1cb4d7..62c25573e8 100644 --- a/neqo-transport/src/connection/idle.rs +++ b/neqo-transport/src/connection/idle.rs @@ -96,6 +96,21 @@ impl IdleTimeout { self.start(now) + max(self.timeout / 2, pto) } + pub fn maybe_keep_alive_timeout(&self, now: Instant, pto: Duration) -> Option { + if self.keep_alive_outstanding { + return None; + } + + let timeout = self.keep_alive_timeout(now, pto); + // This could happen when the state is AckElicitingPacketSent(t) and + // the t exceeds keep_alive_timeout. + if timeout <= now { + return None; + } + + Some(timeout) + } + pub fn send_keep_alive( &mut self, now: Instant, diff --git a/neqo-transport/src/connection/mod.rs b/neqo-transport/src/connection/mod.rs index b933ebbbfd..ba61c999cd 100644 --- a/neqo-transport/src/connection/mod.rs +++ b/neqo-transport/src/connection/mod.rs @@ -1065,9 +1065,18 @@ impl Connection { let pto = rtt.pto(self.confirmed()); let idle_time = self.idle_timeout.expiry(now, pto); - qtrace!([self], "Idle/keepalive timer {:?}", idle_time); + qtrace!([self], "Idle timer {:?}", idle_time); delays.push(idle_time); + let keep_alive = self.streams.need_keep_alive(); + if keep_alive { + if let Some(keep_alive_time) = self.idle_timeout.maybe_keep_alive_timeout(now, pto) + { + qtrace!([self], "Keep alive timer {:?}", keep_alive_time); + delays.push(keep_alive_time); + } + } + if let Some(lr_time) = self.loss_recovery.next_timeout(&path) { qtrace!([self], "Loss recovery timer {:?}", lr_time); delays.push(lr_time); diff --git a/neqo-transport/src/connection/tests/idle.rs b/neqo-transport/src/connection/tests/idle.rs index 8677d7f5d5..de14c9365a 100644 --- a/neqo-transport/src/connection/tests/idle.rs +++ b/neqo-transport/src/connection/tests/idle.rs @@ -412,8 +412,9 @@ fn keep_alive_initiator() { let stream = create_stream_idle(&mut server, &mut client); let mut now = now(); + // Marking the stream for keep-alive changes the idle timeout. server.stream_keep_alive(stream, true).unwrap(); - assert_idle(&mut server, now, default_timeout()); + assert_idle(&mut server, now, default_timeout() / 2); // Wait that long and the server should send a PING frame. now += default_timeout() / 2; @@ -427,8 +428,8 @@ fn keep_alive_initiator() { let out = server.process(out.as_ref(), now).dgram(); assert!(client.process(out.as_ref(), now).dgram().is_none()); - // Check that there will be next keep-alive ping after default_timeout(). - assert_idle(&mut server, now, default_timeout()); + // Check that there will be next keep-alive ping after default_timeout() / 2. + assert_idle(&mut server, now, default_timeout() / 2); now += default_timeout() / 2; let pings_before2 = server.stats().frame_tx.ping; let ping = server.process_output(now).dgram(); @@ -446,7 +447,7 @@ fn keep_alive_lost() { let mut now = now(); server.stream_keep_alive(stream, true).unwrap(); - assert_idle(&mut server, now, default_timeout()); + assert_idle(&mut server, now, default_timeout() / 2); // Wait that long and the server should send a PING frame. now += default_timeout() / 2; @@ -475,7 +476,7 @@ fn keep_alive_lost() { // return some small timeout for the recovry although it does not have // any outstanding data. Therefore we call it after AT_LEAST_PTO. now += AT_LEAST_PTO; - assert_idle(&mut server, now, default_timeout() - AT_LEAST_PTO); + assert_idle(&mut server, now, default_timeout() / 2 - AT_LEAST_PTO); } /// The other peer can also keep it alive. @@ -488,10 +489,11 @@ fn keep_alive_responder() { let mut now = now(); client.stream_keep_alive(stream, true).unwrap(); - assert_idle(&mut client, now, default_timeout()); + assert_idle(&mut client, now, default_timeout() / 2); // Wait that long and the client should send a PING frame. now += default_timeout() / 2; + eprintln!("after wait"); let pings_before = client.stats().frame_tx.ping; let ping = client.process_output(now).dgram(); assert!(ping.is_some()); @@ -507,7 +509,7 @@ fn keep_alive_unmark() { let stream = create_stream_idle(&mut client, &mut server); client.stream_keep_alive(stream, true).unwrap(); - assert_idle(&mut client, now(), default_timeout()); + assert_idle(&mut client, now(), default_timeout() / 2); client.stream_keep_alive(stream, false).unwrap(); assert_idle(&mut client, now(), default_timeout()); @@ -537,11 +539,11 @@ fn keep_alive_close() { let stream = create_stream_idle(&mut client, &mut server); client.stream_keep_alive(stream, true).unwrap(); - assert_idle(&mut client, now(), default_timeout()); + assert_idle(&mut client, now(), default_timeout() / 2); client.stream_close_send(stream).unwrap(); transfer_force_idle(&mut client, &mut server); - assert_idle(&mut client, now(), default_timeout()); + assert_idle(&mut client, now(), default_timeout() / 2); server.stream_close_send(stream).unwrap(); transfer_force_idle(&mut server, &mut client); @@ -558,11 +560,11 @@ fn keep_alive_reset() { let stream = create_stream_idle(&mut client, &mut server); client.stream_keep_alive(stream, true).unwrap(); - assert_idle(&mut client, now(), default_timeout()); + assert_idle(&mut client, now(), default_timeout() / 2); client.stream_close_send(stream).unwrap(); transfer_force_idle(&mut client, &mut server); - assert_idle(&mut client, now(), default_timeout()); + assert_idle(&mut client, now(), default_timeout() / 2); server.stream_reset_send(stream, 0).unwrap(); transfer_force_idle(&mut server, &mut client); @@ -584,7 +586,7 @@ fn keep_alive_stop_sending() { let stream = create_stream_idle(&mut client, &mut server); client.stream_keep_alive(stream, true).unwrap(); - assert_idle(&mut client, now(), default_timeout()); + assert_idle(&mut client, now(), default_timeout() / 2); client.stream_close_send(stream).unwrap(); client.stream_stop_sending(stream, 0).unwrap(); @@ -608,14 +610,14 @@ fn keep_alive_multiple_stop() { let stream = create_stream_idle(&mut client, &mut server); client.stream_keep_alive(stream, true).unwrap(); - assert_idle(&mut client, now(), default_timeout()); + assert_idle(&mut client, now(), default_timeout() / 2); let other = client.stream_create(StreamType::BiDi).unwrap(); client.stream_keep_alive(other, true).unwrap(); - assert_idle(&mut client, now(), default_timeout()); + assert_idle(&mut client, now(), default_timeout() / 2); client.stream_keep_alive(stream, false).unwrap(); - assert_idle(&mut client, now(), default_timeout()); + assert_idle(&mut client, now(), default_timeout() / 2); client.stream_keep_alive(other, false).unwrap(); assert_idle(&mut client, now(), default_timeout()); @@ -686,8 +688,9 @@ fn keep_alive_with_ack_eliciting_packet_lost() { // Create a stream. let stream = client.stream_create(StreamType::BiDi).unwrap(); + // Marking the stream for keep-alive changes the idle timeout. client.stream_keep_alive(stream, true).unwrap(); - assert_idle(&mut client, now, IDLE_TIMEOUT); + assert_idle(&mut client, now, IDLE_TIMEOUT / 2); // Send data on the stream that will be lost. _ = client.stream_send(stream, DEFAULT_STREAM_DATA).unwrap(); @@ -702,11 +705,13 @@ fn keep_alive_with_ack_eliciting_packet_lost() { let retransmit = client.process_output(now).dgram(); assert!(retransmit.is_some()); - // The timeout is the twice the PTO, because we've already sent one probe. - assert_eq!(client.process_output(now).callback(), pto * 2); + // The next callback should be for an idle PING. + assert_eq!( + client.process_output(now).callback(), + IDLE_TIMEOUT / 2 - pto + ); - // Wait for half the idle timeout (less the PTO we've already waited) - // so that we get a keep-alive. + // Wait that long and the client should send a PING frame. now += IDLE_TIMEOUT / 2 - pto; let pings_before = client.stats().frame_tx.ping; let ping = client.process_output(now).dgram(); From bf22de36593c9c8efa9efbe9080fc84be816eb7b Mon Sep 17 00:00:00 2001 From: Kershaw Chang Date: Mon, 30 Sep 2024 20:35:21 +0200 Subject: [PATCH 2/3] address comments --- neqo-transport/src/connection/idle.rs | 4 ++-- neqo-transport/src/connection/mod.rs | 3 +-- 2 files changed, 3 insertions(+), 4 deletions(-) diff --git a/neqo-transport/src/connection/idle.rs b/neqo-transport/src/connection/idle.rs index 62c25573e8..8c5e030b04 100644 --- a/neqo-transport/src/connection/idle.rs +++ b/neqo-transport/src/connection/idle.rs @@ -102,8 +102,8 @@ impl IdleTimeout { } let timeout = self.keep_alive_timeout(now, pto); - // This could happen when the state is AckElicitingPacketSent(t) and - // the t exceeds keep_alive_timeout. + // Timer is in the past, i.e. we should have sent a keep alive, + // but we were unable to, e.g. due to CC. if timeout <= now { return None; } diff --git a/neqo-transport/src/connection/mod.rs b/neqo-transport/src/connection/mod.rs index ba61c999cd..efc4bffca2 100644 --- a/neqo-transport/src/connection/mod.rs +++ b/neqo-transport/src/connection/mod.rs @@ -1068,8 +1068,7 @@ impl Connection { qtrace!([self], "Idle timer {:?}", idle_time); delays.push(idle_time); - let keep_alive = self.streams.need_keep_alive(); - if keep_alive { + if self.streams.need_keep_alive() { if let Some(keep_alive_time) = self.idle_timeout.maybe_keep_alive_timeout(now, pto) { qtrace!([self], "Keep alive timer {:?}", keep_alive_time); From b5b887db8db5dabb277c0884ed74a3727c2137f2 Mon Sep 17 00:00:00 2001 From: Kershaw Chang Date: Wed, 2 Oct 2024 13:41:17 +0200 Subject: [PATCH 3/3] address more comments --- neqo-transport/src/connection/idle.rs | 2 +- neqo-transport/src/connection/mod.rs | 5 +-- neqo-transport/src/connection/tests/idle.rs | 48 +++++++++++---------- 3 files changed, 29 insertions(+), 26 deletions(-) diff --git a/neqo-transport/src/connection/idle.rs b/neqo-transport/src/connection/idle.rs index 8c5e030b04..c5b570a09c 100644 --- a/neqo-transport/src/connection/idle.rs +++ b/neqo-transport/src/connection/idle.rs @@ -96,7 +96,7 @@ impl IdleTimeout { self.start(now) + max(self.timeout / 2, pto) } - pub fn maybe_keep_alive_timeout(&self, now: Instant, pto: Duration) -> Option { + pub fn next_keep_alive(&self, now: Instant, pto: Duration) -> Option { if self.keep_alive_outstanding { return None; } diff --git a/neqo-transport/src/connection/mod.rs b/neqo-transport/src/connection/mod.rs index efc4bffca2..156c7de815 100644 --- a/neqo-transport/src/connection/mod.rs +++ b/neqo-transport/src/connection/mod.rs @@ -1053,7 +1053,7 @@ impl Connection { return timeout.duration_since(now); } - let mut delays = SmallVec::<[_; 6]>::new(); + let mut delays = SmallVec::<[_; 7]>::new(); if let Some(ack_time) = self.acks.ack_time(now) { qtrace!([self], "Delayed ACK timer {:?}", ack_time); delays.push(ack_time); @@ -1069,8 +1069,7 @@ impl Connection { delays.push(idle_time); if self.streams.need_keep_alive() { - if let Some(keep_alive_time) = self.idle_timeout.maybe_keep_alive_timeout(now, pto) - { + if let Some(keep_alive_time) = self.idle_timeout.next_keep_alive(now, pto) { qtrace!([self], "Keep alive timer {:?}", keep_alive_time); delays.push(keep_alive_time); } diff --git a/neqo-transport/src/connection/tests/idle.rs b/neqo-transport/src/connection/tests/idle.rs index de14c9365a..dfb59235c8 100644 --- a/neqo-transport/src/connection/tests/idle.rs +++ b/neqo-transport/src/connection/tests/idle.rs @@ -30,6 +30,10 @@ fn default_timeout() -> Duration { ConnectionParameters::default().get_idle_timeout() } +fn keep_alive_timeout() -> Duration { + default_timeout() / 2 +} + fn test_idle_timeout(client: &mut Connection, server: &mut Connection, timeout: Duration) { assert!(timeout > Duration::from_secs(1)); connect_force_idle(client, server); @@ -414,10 +418,10 @@ fn keep_alive_initiator() { // Marking the stream for keep-alive changes the idle timeout. server.stream_keep_alive(stream, true).unwrap(); - assert_idle(&mut server, now, default_timeout() / 2); + assert_idle(&mut server, now, keep_alive_timeout()); // Wait that long and the server should send a PING frame. - now += default_timeout() / 2; + now += keep_alive_timeout(); let pings_before = server.stats().frame_tx.ping; let ping = server.process_output(now).dgram(); assert!(ping.is_some()); @@ -428,9 +432,9 @@ fn keep_alive_initiator() { let out = server.process(out.as_ref(), now).dgram(); assert!(client.process(out.as_ref(), now).dgram().is_none()); - // Check that there will be next keep-alive ping after default_timeout() / 2. - assert_idle(&mut server, now, default_timeout() / 2); - now += default_timeout() / 2; + // Check that there will be next keep-alive ping after keep_alive_timeout(). + assert_idle(&mut server, now, keep_alive_timeout()); + now += keep_alive_timeout(); let pings_before2 = server.stats().frame_tx.ping; let ping = server.process_output(now).dgram(); assert!(ping.is_some()); @@ -447,10 +451,10 @@ fn keep_alive_lost() { let mut now = now(); server.stream_keep_alive(stream, true).unwrap(); - assert_idle(&mut server, now, default_timeout() / 2); + assert_idle(&mut server, now, keep_alive_timeout()); // Wait that long and the server should send a PING frame. - now += default_timeout() / 2; + now += keep_alive_timeout(); let pings_before = server.stats().frame_tx.ping; let ping = server.process_output(now).dgram(); assert!(ping.is_some()); @@ -476,7 +480,7 @@ fn keep_alive_lost() { // return some small timeout for the recovry although it does not have // any outstanding data. Therefore we call it after AT_LEAST_PTO. now += AT_LEAST_PTO; - assert_idle(&mut server, now, default_timeout() / 2 - AT_LEAST_PTO); + assert_idle(&mut server, now, keep_alive_timeout() - AT_LEAST_PTO); } /// The other peer can also keep it alive. @@ -489,10 +493,10 @@ fn keep_alive_responder() { let mut now = now(); client.stream_keep_alive(stream, true).unwrap(); - assert_idle(&mut client, now, default_timeout() / 2); + assert_idle(&mut client, now, keep_alive_timeout()); // Wait that long and the client should send a PING frame. - now += default_timeout() / 2; + now += keep_alive_timeout(); eprintln!("after wait"); let pings_before = client.stats().frame_tx.ping; let ping = client.process_output(now).dgram(); @@ -509,7 +513,7 @@ fn keep_alive_unmark() { let stream = create_stream_idle(&mut client, &mut server); client.stream_keep_alive(stream, true).unwrap(); - assert_idle(&mut client, now(), default_timeout() / 2); + assert_idle(&mut client, now(), keep_alive_timeout()); client.stream_keep_alive(stream, false).unwrap(); assert_idle(&mut client, now(), default_timeout()); @@ -539,11 +543,11 @@ fn keep_alive_close() { let stream = create_stream_idle(&mut client, &mut server); client.stream_keep_alive(stream, true).unwrap(); - assert_idle(&mut client, now(), default_timeout() / 2); + assert_idle(&mut client, now(), keep_alive_timeout()); client.stream_close_send(stream).unwrap(); transfer_force_idle(&mut client, &mut server); - assert_idle(&mut client, now(), default_timeout() / 2); + assert_idle(&mut client, now(), keep_alive_timeout()); server.stream_close_send(stream).unwrap(); transfer_force_idle(&mut server, &mut client); @@ -560,19 +564,19 @@ fn keep_alive_reset() { let stream = create_stream_idle(&mut client, &mut server); client.stream_keep_alive(stream, true).unwrap(); - assert_idle(&mut client, now(), default_timeout() / 2); + assert_idle(&mut client, now(), keep_alive_timeout()); client.stream_close_send(stream).unwrap(); transfer_force_idle(&mut client, &mut server); - assert_idle(&mut client, now(), default_timeout() / 2); + assert_idle(&mut client, now(), keep_alive_timeout()); server.stream_reset_send(stream, 0).unwrap(); transfer_force_idle(&mut server, &mut client); assert_idle(&mut client, now(), default_timeout()); // The client will fade away from here. - let t = now() + (default_timeout() / 2); - assert_eq!(client.process_output(t).callback(), default_timeout() / 2); + let t = now() + keep_alive_timeout(); + assert_eq!(client.process_output(t).callback(), keep_alive_timeout()); let t = now() + default_timeout(); assert_eq!(client.process_output(t), Output::None); } @@ -586,7 +590,7 @@ fn keep_alive_stop_sending() { let stream = create_stream_idle(&mut client, &mut server); client.stream_keep_alive(stream, true).unwrap(); - assert_idle(&mut client, now(), default_timeout() / 2); + assert_idle(&mut client, now(), keep_alive_timeout()); client.stream_close_send(stream).unwrap(); client.stream_stop_sending(stream, 0).unwrap(); @@ -610,14 +614,14 @@ fn keep_alive_multiple_stop() { let stream = create_stream_idle(&mut client, &mut server); client.stream_keep_alive(stream, true).unwrap(); - assert_idle(&mut client, now(), default_timeout() / 2); + assert_idle(&mut client, now(), keep_alive_timeout()); let other = client.stream_create(StreamType::BiDi).unwrap(); client.stream_keep_alive(other, true).unwrap(); - assert_idle(&mut client, now(), default_timeout() / 2); + assert_idle(&mut client, now(), keep_alive_timeout()); client.stream_keep_alive(stream, false).unwrap(); - assert_idle(&mut client, now(), default_timeout() / 2); + assert_idle(&mut client, now(), keep_alive_timeout()); client.stream_keep_alive(other, false).unwrap(); assert_idle(&mut client, now(), default_timeout()); @@ -640,7 +644,7 @@ fn keep_alive_large_rtt() { endpoint.stream_keep_alive(stream, true).unwrap(); let delay = endpoint.process_output(now).callback(); qtrace!([endpoint], "new delay {:?}", delay); - assert!(delay > default_timeout() / 2); + assert!(delay > keep_alive_timeout()); assert!(delay > rtt); } }