Skip to content
This repository has been archived by the owner on Nov 15, 2023. It is now read-only.

Print "stalled" task on shutdown #13022

Merged
merged 3 commits into from
Dec 28, 2022
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions client/cli/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@ sp-version = { version = "5.0.0", path = "../../primitives/version" }
[dev-dependencies]
tempfile = "3.1.0"
futures-timer = "3.0.1"
sp-tracing = { version = "6.0.0", path = "../../primitives/tracing" }

[features]
default = ["rocksdb"]
Expand Down
117 changes: 93 additions & 24 deletions client/cli/src/runner.rs
Original file line number Diff line number Diff line change
Expand Up @@ -152,10 +152,38 @@ impl<C: SubstrateCli> Runner<C> {
//
// This is important to be done before we instruct the tokio runtime to shutdown. Otherwise
// the tokio runtime will wait the full 60 seconds for all tasks to stop.
drop(task_manager);
let task_registry = task_manager.into_task_registry();

// Give all futures 60 seconds to shutdown, before tokio "leaks" them.
self.tokio_runtime.shutdown_timeout(Duration::from_secs(60));
let shutdown_timeout = Duration::from_secs(60);
self.tokio_runtime.shutdown_timeout(shutdown_timeout);

let running_tasks = task_registry.running_tasks();

if !running_tasks.is_empty() {
log::error!("Detected running(potentially stalled) tasks on shutdown:");
running_tasks.iter().for_each(|(task, count)| {
let instances_desc =
if *count > 1 { format!("with {} instances ", count) } else { "".to_string() };

if task.is_default_group() {
log::error!(
"Task \"{}\" was still running {}after waiting {} seconds to finish.",
task.name,
instances_desc,
shutdown_timeout.as_secs(),
);
} else {
log::error!(
"Task \"{}\" (Group: {}) was still running {}after waiting {} seconds to finish.",
task.name,
task.group,
instances_desc,
shutdown_timeout.as_secs(),
);
}
});
}

res.map_err(Into::into)
}
Expand Down Expand Up @@ -388,34 +416,75 @@ mod tests {
assert!((count as u128) < (Duration::from_secs(30).as_millis() / 50));
}

fn run_test_in_another_process(
test_name: &str,
test_body: impl FnOnce(),
) -> Option<std::process::Output> {
if std::env::var("RUN_FORKED_TEST").is_ok() {
test_body();
None
} else {
let output = std::process::Command::new(std::env::current_exe().unwrap())
.arg(test_name)
.env("RUN_FORKED_TEST", "1")
.output()
.unwrap();

assert!(output.status.success());
Some(output)
}
}

/// This test ensures that `run_node_until_exit` aborts waiting for "stuck" tasks after 60
/// seconds, aka doesn't wait until they are finished (which may never happen).
#[test]
fn ensure_run_until_exit_is_not_blocking_indefinitely() {
let runner = create_runner();
let output = run_test_in_another_process(
"ensure_run_until_exit_is_not_blocking_indefinitely",
|| {
sp_tracing::try_init_simple();

let runner = create_runner();

runner
.run_node_until_exit(move |cfg| async move {
let task_manager =
TaskManager::new(cfg.tokio_handle.clone(), None).unwrap();
let (sender, receiver) = futures::channel::oneshot::channel();

// We need to use `spawn_blocking` here so that we get a dedicated thread
// for our future. This future is more blocking code that will never end.
task_manager.spawn_handle().spawn_blocking("test", None, async move {
let _ = sender.send(());
loop {
std::thread::sleep(Duration::from_secs(30));
}
});

task_manager.spawn_essential_handle().spawn_blocking(
"test2",
None,
async {
// Let's stop this essential task directly when our other task
// started. It will signal that the task manager should end.
let _ = receiver.await;
},
);

Ok::<_, sc_service::Error>(task_manager)
})
.unwrap_err();
},
);

runner
.run_node_until_exit(move |cfg| async move {
let task_manager = TaskManager::new(cfg.tokio_handle.clone(), None).unwrap();
let (sender, receiver) = futures::channel::oneshot::channel();
let Some(output) = output else { return } ;

// We need to use `spawn_blocking` here so that we get a dedicated thread for our
// future. This future is more blocking code that will never end.
task_manager.spawn_handle().spawn_blocking("test", None, async move {
let _ = sender.send(());
loop {
std::thread::sleep(Duration::from_secs(30));
}
});

task_manager.spawn_essential_handle().spawn_blocking("test2", None, async {
// Let's stop this essential task directly when our other task started.
// It will signal that the task manager should end.
let _ = receiver.await;
});
let stderr = dbg!(String::from_utf8(output.stderr).unwrap());

Ok::<_, sc_service::Error>(task_manager)
})
.unwrap_err();
assert!(
stderr.contains("Task \"test\" was still running after waiting 60 seconds to finish.")
);
assert!(!stderr
.contains("Task \"test2\" was still running after waiting 60 seconds to finish."));
}
}
2 changes: 1 addition & 1 deletion client/service/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,7 @@ pub use sc_transaction_pool::Options as TransactionPoolOptions;
pub use sc_transaction_pool_api::{error::IntoPoolError, InPoolTransaction, TransactionPool};
#[doc(hidden)]
pub use std::{ops::Deref, result::Result, sync::Arc};
pub use task_manager::{SpawnTaskHandle, TaskManager, DEFAULT_GROUP_NAME};
pub use task_manager::{SpawnTaskHandle, Task, Taskmanager, TaskRegistry, DEFAULT_GROUP_NAME};
bkchr marked this conversation as resolved.
Show resolved Hide resolved
bkchr marked this conversation as resolved.
Show resolved Hide resolved

const DEFAULT_PROTOCOL_ID: &str = "sup";

Expand Down
98 changes: 97 additions & 1 deletion client/service/src/task_manager/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,12 +24,19 @@ use futures::{
future::{pending, select, try_join_all, BoxFuture, Either},
Future, FutureExt, StreamExt,
};
use parking_lot::Mutex;
use prometheus_endpoint::{
exponential_buckets, register, CounterVec, HistogramOpts, HistogramVec, Opts, PrometheusError,
Registry, U64,
};
use sc_utils::mpsc::{tracing_unbounded, TracingUnboundedReceiver, TracingUnboundedSender};
use std::{panic, pin::Pin, result::Result};
use std::{
collections::{hash_map::Entry, HashMap},
panic,
pin::Pin,
result::Result,
sync::Arc,
};
use tokio::runtime::Handle;
use tracing_futures::Instrument;

Expand Down Expand Up @@ -72,6 +79,7 @@ pub struct SpawnTaskHandle {
on_exit: exit_future::Exit,
tokio_handle: Handle,
metrics: Option<Metrics>,
task_registry: TaskRegistry,
}

impl SpawnTaskHandle {
Expand Down Expand Up @@ -113,6 +121,7 @@ impl SpawnTaskHandle {
) {
let on_exit = self.on_exit.clone();
let metrics = self.metrics.clone();
let registry = self.task_registry.clone();

let group = match group.into() {
GroupName::Specific(var) => var,
Expand All @@ -129,6 +138,10 @@ impl SpawnTaskHandle {
}

let future = async move {
// Register the task and keep the "token" alive until the task is ended. Then this
// "token" will unregister this task.
let _registry_token = registry.register_task(name, group);

if let Some(metrics) = metrics {
// Add some wrappers around `task`.
let task = {
Expand Down Expand Up @@ -298,6 +311,8 @@ pub struct TaskManager {
/// terminates and gracefully shutdown. Also ends the parent `future()` if a child's essential
/// task fails.
children: Vec<TaskManager>,
/// The registry of all running tasks.
task_registry: TaskRegistry,
}

impl TaskManager {
Expand All @@ -324,6 +339,7 @@ impl TaskManager {
essential_failed_rx,
keep_alive: Box::new(()),
children: Vec::new(),
task_registry: Default::default(),
})
}

Expand All @@ -333,6 +349,7 @@ impl TaskManager {
on_exit: self.on_exit.clone(),
tokio_handle: self.tokio_handle.clone(),
metrics: self.metrics.clone(),
task_registry: self.task_registry.clone(),
}
}

Expand Down Expand Up @@ -385,6 +402,14 @@ impl TaskManager {
pub fn add_child(&mut self, child: TaskManager) {
self.children.push(child);
}

/// Consume `self` and return the [`TaskRegistry`].
///
/// This [`TaskRegistry`] can be used to check for still running tasks after this task manager
/// was dropped.
pub fn into_task_registry(self) -> TaskRegistry {
self.task_registry
}
}

#[derive(Clone)]
Expand Down Expand Up @@ -434,3 +459,74 @@ impl Metrics {
})
}
}

/// Ensures that a [`Task`] is unregistered when this object is dropped.
struct UnregisterOnDrop {
task: Task,
registry: TaskRegistry,
}

impl Drop for UnregisterOnDrop {
fn drop(&mut self) {
let mut tasks = self.registry.tasks.lock();

if let Entry::Occupied(mut entry) = (*tasks).entry(self.task.clone()) {
*entry.get_mut() -= 1;

if *entry.get() == 0 {
entry.remove();
}
}
}
}

/// Represents a running async task in the [`TaskManager`].
///
/// As a task is identified by a name and a group, it is totally valid that there exists multiple
/// tasks with the same name and group.
#[derive(Clone, Hash, Eq, PartialEq)]
pub struct Task {
/// The name of the task.
pub name: &'static str,
/// The group this task is associated to.
pub group: &'static str,
}

impl Task {
/// Returns if the `group` is the [`DEFAULT_GROUP_NAME`].
pub fn is_default_group(&self) -> bool {
self.group == DEFAULT_GROUP_NAME
}
}

/// Keeps track of all running [`Task`]s in [`TaskManager`].
#[derive(Clone, Default)]
pub struct TaskRegistry {
tasks: Arc<Mutex<HashMap<Task, usize>>>,
}

impl TaskRegistry {
/// Register a task with the given `name` and `group`.
///
/// Returns [`UnregisterOnDrop`] that ensures that the task is unregistered when this value is
/// dropped.
fn register_task(&self, name: &'static str, group: &'static str) -> UnregisterOnDrop {
let task = Task { name, group };

{
let mut tasks = self.tasks.lock();

*(*tasks).entry(task.clone()).or_default() += 1;
}

UnregisterOnDrop { task, registry: self.clone() }
}

/// Returns the running tasks.
///
/// As a task is only identified by its `name` and `group`, there can be duplicate tasks. The
/// number per task represents the concurrently running tasks with the same identifier.
pub fn running_tasks(&self) -> HashMap<Task, usize> {
(*self.tasks.lock()).clone()
}
}