Skip to content

feat(server): add graceful::Watcher type #182

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 1 commit into from
Apr 17, 2025
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
48 changes: 42 additions & 6 deletions src/server/graceful.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,10 +17,20 @@ use pin_project_lite::pin_project;
use tokio::sync::watch;

/// A graceful shutdown utility
// Purposefully not `Clone`, see `watcher()` method for why.
pub struct GracefulShutdown {
tx: watch::Sender<()>,
}

/// A watcher side of the graceful shutdown.
///
/// This type can only watch a connection, it cannot trigger a shutdown.
///
/// Call [`GracefulShutdown::watcher()`] to construct one of these.
pub struct Watcher {
rx: watch::Receiver<()>,
}

impl GracefulShutdown {
/// Create a new graceful shutdown helper.
pub fn new() -> Self {
Expand All @@ -30,12 +40,20 @@ impl GracefulShutdown {

/// Wrap a future for graceful shutdown watching.
pub fn watch<C: GracefulConnection>(&self, conn: C) -> impl Future<Output = C::Output> {
let mut rx = self.tx.subscribe();
GracefulConnectionFuture::new(conn, async move {
let _ = rx.changed().await;
// hold onto the rx until the watched future is completed
rx
})
self.watcher().watch(conn)
}

/// Create an owned type that can watch a connection.
///
/// This method allows created an owned type that can be sent onto another
/// task before calling [`Watcher::watch()`].
// Internal: this function exists because `Clone` allows footguns.
// If the `tx` were cloned (or the `rx`), race conditions can happens where
// one task starting a shutdown is scheduled and interwined with a task
// starting to watch a connection, and the "watch version" is one behind.
pub fn watcher(&self) -> Watcher {
let rx = self.tx.subscribe();
Watcher { rx }
}

/// Signal shutdown for all watched connections.
Expand Down Expand Up @@ -64,6 +82,24 @@ impl Default for GracefulShutdown {
}
}

impl Watcher {
/// Wrap a future for graceful shutdown watching.
pub fn watch<C: GracefulConnection>(self, conn: C) -> impl Future<Output = C::Output> {
let Watcher { mut rx } = self;
GracefulConnectionFuture::new(conn, async move {
let _ = rx.changed().await;
// hold onto the rx until the watched future is completed
rx
})
}
}

impl Debug for Watcher {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
f.debug_struct("GracefulWatcher").finish()
}
}

pin_project! {
struct GracefulConnectionFuture<C, F: Future> {
#[pin]
Expand Down