Skip to content

Commit

Permalink
fix(ffi): hide some runtime code from ffi
Browse files Browse the repository at this point in the history
code that was previously not compiled for the C API due to the `runtime` flag was causing compilation errors
  • Loading branch information
oddgrd committed Sep 4, 2022
1 parent 2b5721a commit 860f0f3
Show file tree
Hide file tree
Showing 3 changed files with 85 additions and 24 deletions.
4 changes: 4 additions & 0 deletions src/proto/h2/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -89,8 +89,11 @@ fn new_ping_config(config: &Config) -> ping::Config {
} else {
None
},
#[cfg(not(feature = "ffi"))]
keep_alive_interval: config.keep_alive_interval,
#[cfg(not(feature = "ffi"))]
keep_alive_timeout: config.keep_alive_timeout,
#[cfg(not(feature = "ffi"))]
keep_alive_while_idle: config.keep_alive_while_idle,
}
}
Expand Down Expand Up @@ -137,6 +140,7 @@ where
conn.set_target_window_size(wnd);
conn.set_initial_window_size(wnd)?;
}
#[cfg(not(feature = "ffi"))]
Poll::Ready(ping::Ponged::KeepAliveTimedOut) => {
debug!("connection keep-alive timed out");
return Poll::Ready(Ok(()));
Expand Down
101 changes: 77 additions & 24 deletions src/proto/h2/ping.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,11 @@
/// 3b. Merge RTT with a running average.
/// 3c. Calculate bdp as bytes/rtt.
/// 3d. If bdp is over 2/3 max, set new max to bdp and update windows.
#[cfg(not(feature = "ffi"))]
use std::fmt;
#[cfg(not(feature = "ffi"))]
use std::future::Future;
#[cfg(not(feature = "ffi"))]
use std::pin::Pin;
use std::sync::{Arc, Mutex};
use std::task::{self, Poll};
Expand All @@ -28,7 +31,9 @@ use std::time::{Duration, Instant};
use h2::{Ping, PingPong};
use tracing::{debug, trace};

#[cfg_attr(feature = "ffi", allow(unused))]
use crate::common::time::Time;
#[cfg_attr(feature = "ffi", allow(unused))]
use crate::rt::Sleep;

type WindowSize = u32;
Expand Down Expand Up @@ -57,6 +62,7 @@ pub(super) fn channel(ping_pong: PingPong, config: Config, __timer: Time) -> (Re
(None, None)
};

#[cfg(not(feature = "ffi"))]
let keep_alive = config.keep_alive_interval.map(|interval| KeepAlive {
interval,
timeout: config.keep_alive_timeout,
Expand All @@ -66,11 +72,14 @@ pub(super) fn channel(ping_pong: PingPong, config: Config, __timer: Time) -> (Re
timer: __timer,
});

#[cfg(not(feature = "ffi"))]
let last_read_at = keep_alive.as_ref().map(|_| Instant::now());

let shared = Arc::new(Mutex::new(Shared {
bytes,
#[cfg(not(feature = "ffi"))]
last_read_at,
#[cfg(not(feature = "ffi"))]
is_keep_alive_timed_out: false,
ping_pong,
ping_sent_at: None,
Expand All @@ -83,6 +92,7 @@ pub(super) fn channel(ping_pong: PingPong, config: Config, __timer: Time) -> (Re
},
Ponger {
bdp,
#[cfg(not(feature = "ffi"))]
keep_alive,
shared,
},
Expand All @@ -93,11 +103,14 @@ pub(super) fn channel(ping_pong: PingPong, config: Config, __timer: Time) -> (Re
pub(super) struct Config {
pub(super) bdp_initial_window: Option<WindowSize>,
/// If no frames are received in this amount of time, a PING frame is sent.
#[cfg(not(feature = "ffi"))]
pub(super) keep_alive_interval: Option<Duration>,
/// After sending a keepalive PING, the connection will be closed if
/// a pong is not received in this amount of time.
#[cfg(not(feature = "ffi"))]
pub(super) keep_alive_timeout: Duration,
/// If true, sends pings even when there are no active streams.
#[cfg(not(feature = "ffi"))]
pub(super) keep_alive_while_idle: bool,
}

Expand All @@ -108,6 +121,7 @@ pub(crate) struct Recorder {

pub(super) struct Ponger {
bdp: Option<Bdp>,
#[cfg(not(feature = "ffi"))]
keep_alive: Option<KeepAlive>,
shared: Arc<Mutex<Shared>>,
}
Expand All @@ -127,8 +141,10 @@ struct Shared {
// keep-alive
/// If `Some`, keep-alive is enabled, and the Instant is how long ago
/// the connection read the last frame.
#[cfg(not(feature = "ffi"))]
last_read_at: Option<Instant>,

#[cfg(not(feature = "ffi"))]
is_keep_alive_timed_out: bool,
}

Expand All @@ -147,6 +163,7 @@ struct Bdp {
stable_count: u32,
}

#[cfg(not(feature = "ffi"))]
struct KeepAlive {
/// If no frames are received in this amount of time, a PING frame is sent.
interval: Duration,
Expand All @@ -155,11 +172,13 @@ struct KeepAlive {
timeout: Duration,
/// If true, sends pings even when there are no active streams.
while_idle: bool,

state: KeepAliveState,
sleep: Pin<Box<dyn Sleep>>,
timer: Time,
}

#[cfg(not(feature = "ffi"))]
enum KeepAliveState {
Init,
Scheduled(Instant),
Expand All @@ -168,17 +187,27 @@ enum KeepAliveState {

pub(super) enum Ponged {
SizeUpdate(WindowSize),
#[cfg(not(feature = "ffi"))]
KeepAliveTimedOut,
}

#[cfg(not(feature = "ffi"))]
#[derive(Debug)]
pub(super) struct KeepAliveTimedOut;

// ===== impl Config =====

impl Config {
pub(super) fn is_enabled(&self) -> bool {
self.bdp_initial_window.is_some() || self.keep_alive_interval.is_some()
#[cfg(not(feature = "ffi"))]
{
self.bdp_initial_window.is_some() || self.keep_alive_interval.is_some()
}

#[cfg(feature = "ffi")]
{
self.bdp_initial_window.is_some()
}
}
}

Expand All @@ -194,6 +223,7 @@ impl Recorder {

let mut locked = shared.lock().unwrap();

#[cfg(not(feature = "ffi"))]
locked.update_last_read_at();

// are we ready to send another bdp ping?
Expand All @@ -220,15 +250,18 @@ impl Recorder {
}

pub(crate) fn record_non_data(&self) {
let shared = if let Some(ref shared) = self.shared {
shared
} else {
return;
};
#[cfg(not(feature = "ffi"))]
{
let shared = if let Some(ref shared) = self.shared {
shared
} else {
return;
};

let mut locked = shared.lock().unwrap();
let mut locked = shared.lock().unwrap();

locked.update_last_read_at();
locked.update_last_read_at();
}
}

/// If the incoming stream is already closed, convert self into
Expand All @@ -243,10 +276,13 @@ impl Recorder {
}

pub(super) fn ensure_not_timed_out(&self) -> crate::Result<()> {
if let Some(ref shared) = self.shared {
let locked = shared.lock().unwrap();
if locked.is_keep_alive_timed_out {
return Err(KeepAliveTimedOut.crate_error());
#[cfg(not(feature = "ffi"))]
{
if let Some(ref shared) = self.shared {
let locked = shared.lock().unwrap();
if locked.is_keep_alive_timed_out {
return Err(KeepAliveTimedOut.crate_error());
}
}
}

Expand All @@ -261,11 +297,15 @@ impl Ponger {
pub(super) fn poll(&mut self, cx: &mut task::Context<'_>) -> Poll<Ponged> {
let now = Instant::now();
let mut locked = self.shared.lock().unwrap();
#[cfg(not(feature = "ffi"))]
let is_idle = self.is_idle();

if let Some(ref mut ka) = self.keep_alive {
ka.maybe_schedule(is_idle, &locked);
ka.maybe_ping(cx, &mut locked);
#[cfg(not(feature = "ffi"))]
{
if let Some(ref mut ka) = self.keep_alive {
ka.maybe_schedule(is_idle, &locked);
ka.maybe_ping(cx, &mut locked);
}
}

if !locked.is_ping_sent() {
Expand All @@ -282,10 +322,13 @@ impl Ponger {
let rtt = now - start;
trace!("recv pong");

if let Some(ref mut ka) = self.keep_alive {
locked.update_last_read_at();
ka.maybe_schedule(is_idle, &locked);
ka.maybe_ping(cx, &mut locked);
#[cfg(not(feature = "ffi"))]
{
if let Some(ref mut ka) = self.keep_alive {
locked.update_last_read_at();
ka.maybe_schedule(is_idle, &locked);
ka.maybe_ping(cx, &mut locked);
}
}

if let Some(ref mut bdp) = self.bdp {
Expand All @@ -304,11 +347,14 @@ impl Ponger {
debug!("pong error: {}", e);
}
Poll::Pending => {
if let Some(ref mut ka) = self.keep_alive {
if let Err(KeepAliveTimedOut) = ka.maybe_timeout(cx) {
self.keep_alive = None;
locked.is_keep_alive_timed_out = true;
return Poll::Ready(Ponged::KeepAliveTimedOut);
#[cfg(not(feature = "ffi"))]
{
if let Some(ref mut ka) = self.keep_alive {
if let Err(KeepAliveTimedOut) = ka.maybe_timeout(cx) {
self.keep_alive = None;
locked.is_keep_alive_timed_out = true;
return Poll::Ready(Ponged::KeepAliveTimedOut);
}
}
}
}
Expand All @@ -318,6 +364,7 @@ impl Ponger {
Poll::Pending
}

#[cfg(not(feature = "ffi"))]
fn is_idle(&self) -> bool {
Arc::strong_count(&self.shared) <= 2
}
Expand All @@ -342,12 +389,14 @@ impl Shared {
self.ping_sent_at.is_some()
}

#[cfg(not(feature = "ffi"))]
fn update_last_read_at(&mut self) {
if self.last_read_at.is_some() {
self.last_read_at = Some(Instant::now());
}
}

#[cfg(not(feature = "ffi"))]
fn last_read_at(&self) -> Instant {
self.last_read_at.expect("keep_alive expects last_read_at")
}
Expand Down Expand Up @@ -423,6 +472,7 @@ fn seconds(dur: Duration) -> f64 {

// ===== impl KeepAlive =====

#[cfg(not(feature = "ffi"))]
impl KeepAlive {
fn maybe_schedule(&mut self, is_idle: bool, shared: &Shared) {
match self.state {
Expand Down Expand Up @@ -487,18 +537,21 @@ impl KeepAlive {

// ===== impl KeepAliveTimedOut =====

#[cfg(not(feature = "ffi"))]
impl KeepAliveTimedOut {
pub(super) fn crate_error(self) -> crate::Error {
crate::Error::new(crate::error::Kind::Http2).with(self)
}
}

#[cfg(not(feature = "ffi"))]
impl fmt::Display for KeepAliveTimedOut {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
f.write_str("keep-alive timed out")
}
}

#[cfg(not(feature = "ffi"))]
impl std::error::Error for KeepAliveTimedOut {
fn source(&self) -> Option<&(dyn std::error::Error + 'static)> {
Some(&crate::error::TimedOut)
Expand Down
4 changes: 4 additions & 0 deletions src/proto/h2/server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -142,10 +142,13 @@ where

let ping_config = ping::Config {
bdp_initial_window: bdp,
#[cfg(not(feature = "ffi"))]
keep_alive_interval: config.keep_alive_interval,
#[cfg(not(feature = "ffi"))]
keep_alive_timeout: config.keep_alive_timeout,
// If keep-alive is enabled for servers, always enabled while
// idle, so it can more aggressively close dead connections.
#[cfg(not(feature = "ffi"))]
keep_alive_while_idle: true,
};

Expand Down Expand Up @@ -360,6 +363,7 @@ where
self.conn.set_target_window_size(wnd);
let _ = self.conn.set_initial_window_size(wnd);
}
#[cfg(not(feature = "ffi"))]
Poll::Ready(ping::Ponged::KeepAliveTimedOut) => {
debug!("keep-alive timed out, closing connection");
self.conn.abrupt_shutdown(h2::Reason::NO_ERROR);
Expand Down

0 comments on commit 860f0f3

Please sign in to comment.