Skip to content

Commit

Permalink
rt: add Runtime::shutdown_timeout (#2186)
Browse files Browse the repository at this point in the history
Provides an API for forcing a runtime to shutdown even if there are
still running tasks.
  • Loading branch information
carllerche authored Jan 29, 2020
1 parent 560d0fa commit 9d6b994
Show file tree
Hide file tree
Showing 7 changed files with 132 additions and 14 deletions.
10 changes: 2 additions & 8 deletions tokio/src/park/thread.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,13 +10,7 @@ pub(crate) struct ParkThread {
inner: Arc<Inner>,
}

/// Error returned by `ParkThread`
///
/// This currently is never returned, but might at some point in the future.
#[derive(Debug)]
pub(crate) struct ParkError {
_p: (),
}
pub(crate) type ParkError = ();

/// Unblocks a thread that was blocked by `ParkThread`.
#[derive(Clone, Debug)]
Expand Down Expand Up @@ -240,7 +234,7 @@ cfg_blocking_impl! {
F: FnOnce(&ParkThread) -> R,
{
CURRENT_PARKER.try_with(|inner| f(inner))
.map_err(|_| ParkError { _p: () })
.map_err(|_| ())
}
}

Expand Down
4 changes: 4 additions & 0 deletions tokio/src/runtime/blocking/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ cfg_blocking_impl! {

cfg_not_blocking_impl! {
use crate::runtime::Builder;
use std::time::Duration;

#[derive(Debug, Clone)]
pub(crate) struct BlockingPool {}
Expand All @@ -35,5 +36,8 @@ cfg_not_blocking_impl! {
pub(crate) fn spawner(&self) -> &BlockingPool {
self
}

pub(crate) fn shutdown(&mut self, _duration: Option<Duration>) {
}
}
}
19 changes: 15 additions & 4 deletions tokio/src/runtime/blocking/pool.rs
Original file line number Diff line number Diff line change
Expand Up @@ -101,19 +101,30 @@ impl BlockingPool {
pub(crate) fn spawner(&self) -> &Spawner {
&self.spawner
}
}

impl Drop for BlockingPool {
fn drop(&mut self) {
pub(crate) fn shutdown(&mut self, timeout: Option<Duration>) {
let mut shared = self.spawner.inner.shared.lock().unwrap();

// The function can be called multiple times. First, by explicitly
// calling `shutdown` then by the drop handler calling `shutdown`. This
// prevents shutting down twice.
if shared.shutdown {
return;
}

shared.shutdown = true;
shared.shutdown_tx = None;
self.spawner.inner.condvar.notify_all();

drop(shared);

self.shutdown_rx.wait();
self.shutdown_rx.wait(timeout);
}
}

impl Drop for BlockingPool {
fn drop(&mut self) {
self.shutdown(None);
}
}

Expand Down
14 changes: 12 additions & 2 deletions tokio/src/runtime/blocking/shutdown.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,8 @@
use crate::loom::sync::Arc;
use crate::sync::oneshot;

use std::time::Duration;

#[derive(Debug, Clone)]
pub(super) struct Sender {
tx: Arc<oneshot::Sender<()>>,
Expand All @@ -26,7 +28,11 @@ pub(super) fn channel() -> (Sender, Receiver) {

impl Receiver {
/// Blocks the current thread until all `Sender` handles drop.
pub(crate) fn wait(&mut self) {
///
/// If `timeout` is `Some`, the thread is blocked for **at most** `timeout`
/// duration. If `timeout` is `None`, then the thread is blocked until the
/// shutdown signal is received.
pub(crate) fn wait(&mut self, timeout: Option<Duration>) {
use crate::runtime::enter::{enter, try_enter};

let mut e = if std::thread::panicking() {
Expand All @@ -43,6 +49,10 @@ impl Receiver {
// If blocking fails to wait, this indicates a problem parking the
// current thread (usually, shutting down a runtime stored in a
// thread-local).
let _ = e.block_on(&mut self.rx);
if let Some(timeout) = timeout {
let _ = e.block_on_timeout(&mut self.rx, timeout);
} else {
let _ = e.block_on(&mut self.rx);
}
}
}
39 changes: 39 additions & 0 deletions tokio/src/runtime/enter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,7 @@ pub(crate) fn exit<F: FnOnce() -> R, R>(f: F) -> R {

cfg_blocking_impl! {
use crate::park::ParkError;
use std::time::Duration;

impl Enter {
/// Blocks the thread on the specified future, returning the value with
Expand Down Expand Up @@ -104,6 +105,44 @@ cfg_blocking_impl! {
park.park()?;
}
}

/// Blocks the thread on the specified future for **at most** `timeout`
///
/// If the future completes before `timeout`, the result is returned. If
/// `timeout` elapses, then `Err` is returned.
pub(crate) fn block_on_timeout<F>(&mut self, mut f: F, timeout: Duration) -> Result<F::Output, ParkError>
where
F: std::future::Future,
{
use crate::park::{CachedParkThread, Park};
use std::pin::Pin;
use std::task::Context;
use std::task::Poll::Ready;
use std::time::Instant;

let mut park = CachedParkThread::new();
let waker = park.get_unpark()?.into_waker();
let mut cx = Context::from_waker(&waker);

// `block_on` takes ownership of `f`. Once it is pinned here, the original `f` binding can
// no longer be accessed, making the pinning safe.
let mut f = unsafe { Pin::new_unchecked(&mut f) };
let when = Instant::now() + timeout;

loop {
if let Ready(v) = f.as_mut().poll(&mut cx) {
return Ok(v);
}

let now = Instant::now();

if now >= when {
return Err(());
}

park.park_timeout(when - now)?;
}
}
}
}

Expand Down
43 changes: 43 additions & 0 deletions tokio/src/runtime/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -235,6 +235,7 @@ cfg_rt_core! {
}

use std::future::Future;
use std::time::Duration;

/// The Tokio runtime.
///
Expand Down Expand Up @@ -441,4 +442,46 @@ impl Runtime {
pub fn handle(&self) -> &Handle {
&self.handle
}

/// Shutdown the runtime, waiting for at most `duration` for all spawned
/// task to shutdown.
///
/// Usually, dropping a `Runtime` handle is sufficient as tasks are able to
/// shutdown in a timely fashion. However, dropping a `Runtime` will wait
/// indefinitely for all tasks to terminate, and there are cases where a long
/// blocking task has been spawned which can block dropping `Runtime`.
///
/// In this case, calling `shutdown_timeout` with an explicit wait timeout
/// can work. The `shutdown_timeout` will signal all tasks to shutdown and
/// will wait for at most `duration` for all spawned tasks to terminate. If
/// `timeout` elapses before all tasks are dropped, the function returns and
/// outstanding tasks are potentially leaked.
///
/// # Examples
///
/// ```
/// use tokio::runtime::Runtime;
/// use tokio::task;
///
/// use std::thread;
/// use std::time::Duration;
///
/// fn main() {
/// let mut runtime = Runtime::new().unwrap();
///
/// runtime.block_on(async move {
/// task::spawn_blocking(move || {
/// thread::sleep(Duration::from_secs(10_000));
/// });
/// });
///
/// runtime.shutdown_timeout(Duration::from_millis(100));
/// }
/// ```
pub fn shutdown_timeout(self, duration: Duration) {
let Runtime {
mut blocking_pool, ..
} = self;
blocking_pool.shutdown(Some(duration));
}
}
17 changes: 17 additions & 0 deletions tokio/tests/rt_common.rs
Original file line number Diff line number Diff line change
Expand Up @@ -709,6 +709,23 @@ rt_test! {
}
}

#[test]
fn shutdown_timeout() {
let (tx, rx) = oneshot::channel();
let mut runtime = rt();

runtime.block_on(async move {
task::spawn_blocking(move || {
tx.send(()).unwrap();
thread::sleep(Duration::from_secs(10_000));
});

rx.await.unwrap();
});

runtime.shutdown_timeout(Duration::from_millis(100));
}

#[test]
fn runtime_in_thread_local() {
use std::cell::RefCell;
Expand Down

0 comments on commit 9d6b994

Please sign in to comment.