Skip to content

Commit

Permalink
Optionally log in yield_periodically
Browse files Browse the repository at this point in the history
Summary:
## This stack

I am trying to debug a case of some methods stalling the entire reactor. See https://fb.workplace.com/groups/1708850869939124/permalink/1836334797190730/ for context.

## This diff

SSIA

Reviewed By: mitrandir77

Differential Revision: D66171895

fbshipit-source-id: 4d5d37c057996c4c7798f198046c5a7550f351f0
  • Loading branch information
andreacampi authored and facebook-github-bot committed Nov 22, 2024
1 parent 02c3694 commit fabc3b2
Show file tree
Hide file tree
Showing 4 changed files with 103 additions and 8 deletions.
2 changes: 2 additions & 0 deletions shed/futures_ext/BUCK
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,9 @@ rust_library(
deps = [
"fbsource//third-party/rust:anyhow",
"fbsource//third-party/rust:futures",
"fbsource//third-party/rust:maybe-owned",
"fbsource//third-party/rust:pin-project",
"fbsource//third-party/rust:slog",
"fbsource//third-party/rust:thiserror",
"fbsource//third-party/rust:tokio",
"//common/rust/shed/shared_error:shared_error",
Expand Down
2 changes: 2 additions & 0 deletions shed/futures_ext/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -13,8 +13,10 @@ license = "MIT OR Apache-2.0"
[dependencies]
anyhow = "1.0.86"
futures = { version = "0.3.30", features = ["async-await", "compat"] }
maybe-owned = "0.3.4"
pin-project = "0.4.30"
shared_error = { version = "0.1.0", path = "../shared_error" }
slog = { version = "2.7", features = ["max_level_trace", "nested-values"] }
thiserror = "1.0.64"
tokio = { version = "1.41.0", features = ["full", "test-util", "tracing"] }

Expand Down
15 changes: 13 additions & 2 deletions shed/futures_ext/src/stream.rs
Original file line number Diff line number Diff line change
Expand Up @@ -67,11 +67,22 @@ pub trait FbStreamExt: Stream {
}

/// Construct a new [self::yield_periodically::YieldPeriodically], with a sensible default.
fn yield_periodically(self) -> YieldPeriodically<Self>
#[track_caller]
fn yield_periodically<'a>(self) -> YieldPeriodically<'a, Self>
where
Self: Sized,
{
YieldPeriodically::new(self, Duration::from_millis(10))
let location = std::panic::Location::caller();

let location = slog::RecordLocation {
file: location.file(),
line: location.line(),
column: location.column(),
function: "",
module: "",
};

YieldPeriodically::new(self, location, Duration::from_millis(10))
}
}

Expand Down
92 changes: 86 additions & 6 deletions shed/futures_ext/src/stream/yield_periodically.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,20 +7,27 @@
* of this source tree.
*/

use std::fmt::Arguments;
use std::pin::Pin;
use std::time::Duration;
use std::time::Instant;

use futures::stream::Stream;
use futures::task::Context;
use futures::task::Poll;
use maybe_owned::MaybeOwned;
use pin_project::pin_project;
use slog::Logger;
use slog::Record;

/// If the budget is exceeded, we will log a warning if the total overshoot is more than this multiplier.
const BUDGET_OVERSHOOT_MULTIPLIER: f32 = 3.0;

/// A stream that will yield control back to the caller if it runs for more than a given duration
/// without yielding (i.e. returning Poll::Pending). The clock starts counting the first time the
/// stream is polled, and is reset every time the stream yields.
#[pin_project]
pub struct YieldPeriodically<S> {
pub struct YieldPeriodically<'a, S> {
#[pin]
inner: S,
/// Default budget.
Expand All @@ -29,21 +36,52 @@ pub struct YieldPeriodically<S> {
current_budget: Duration,
/// Whether the next iteration must yield because the budget was exceeded.
must_yield: bool,
/// The code location where yield_periodically was called.
location: slog::RecordLocation,
/// Enable logging to the provided logger.
logger: Option<MaybeOwned<'a, Logger>>,
/// The threshold for logging.
log_threshold: Duration,
}

impl<S> YieldPeriodically<S> {
impl<S> YieldPeriodically<'_, S> {
/// Create a new [YieldPeriodically].
pub fn new(inner: S, budget: Duration) -> Self {
pub fn new(inner: S, location: slog::RecordLocation, budget: Duration) -> Self {
let multiplier = BUDGET_OVERSHOOT_MULTIPLIER + 1.0;

Self {
inner,
budget,
current_budget: budget,
must_yield: false,
location,
logger: None,
log_threshold: Duration::from_millis(
budget.mul_f32(multiplier).as_millis().try_into().unwrap(),
),
}
}

/// Set the budget for this stream.
pub fn with_budget(mut self, budget: Duration) -> Self {
self.budget = budget;
self.current_budget = budget;
self
}

/// Enable debug logging.
pub fn with_logger<'a, L>(self, logger: L) -> YieldPeriodically<'a, S>
where
L: Into<MaybeOwned<'a, Logger>>,
{
YieldPeriodically {
logger: Some(logger.into()),
..self
}
}
}

impl<S: Stream> Stream for YieldPeriodically<S> {
impl<S: Stream> Stream for YieldPeriodically<'_, S> {
type Item = <S as Stream>::Item;

fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
Expand All @@ -63,11 +101,22 @@ impl<S: Stream> Stream for YieldPeriodically<S> {
return res;
}

let current_budget = *this.current_budget;
let elapsed = now.elapsed();

match this.current_budget.checked_sub(elapsed) {
Some(new_budget) => *this.current_budget = new_budget,
None => {
if elapsed > *this.log_threshold {
maybe_log(
this.logger,
this.location,
&format_args!(
"yield_periodically(): budget overshot: current_budget={:?}, elapsed={:?}",
current_budget, elapsed,
),
);
}
*this.must_yield = true;
*this.current_budget = *this.budget;
}
Expand All @@ -77,6 +126,24 @@ impl<S: Stream> Stream for YieldPeriodically<S> {
}
}

fn maybe_log(
logger: &Option<MaybeOwned<'_, Logger>>,
location: &slog::RecordLocation,
fmt: &Arguments<'_>,
) {
if let Some(logger) = &logger {
logger.log(&Record::new(
&slog::RecordStatic {
location,
level: slog::Level::Warning,
tag: "futures_watchdog",
},
fmt,
slog::b!(),
));
}
}

#[cfg(test)]
mod test {
use futures::stream::StreamExt;
Expand All @@ -90,7 +157,8 @@ mod test {
std::thread::sleep(Duration::from_millis(1));
});

let stream = YieldPeriodically::new(stream, Duration::from_millis(100));
let stream =
YieldPeriodically::new(stream, location_for_test(), Duration::from_millis(100));

futures::pin_mut!(stream);

Expand Down Expand Up @@ -131,7 +199,19 @@ mod test {
})
.take(30);

let stream = YieldPeriodically::new(stream, Duration::from_millis(10));
let stream = YieldPeriodically::new(stream, location_for_test(), Duration::from_millis(10));
stream.collect::<Vec<_>>().await;
}

#[track_caller]
fn location_for_test() -> slog::RecordLocation {
let location = std::panic::Location::caller();
slog::RecordLocation {
file: location.file(),
line: location.line(),
column: location.column(),
function: "",
module: "",
}
}
}

0 comments on commit fabc3b2

Please sign in to comment.