From 894768e5d6bb50c6f8657e16aeeef969cb480a99 Mon Sep 17 00:00:00 2001 From: Cecile Tonglet Date: Thu, 30 Jul 2020 14:33:15 +0200 Subject: [PATCH 01/14] Initial commit Forked at: 19c1d9028d8d6eabef41693433b56e14da025247 Parent branch: origin/master From 18c9792f008a6c622f1c9d650c7d9667c1dfd850 Mon Sep 17 00:00:00 2001 From: Cecile Tonglet Date: Thu, 30 Jul 2020 16:03:53 +0200 Subject: [PATCH 02/14] WIP Forked at: 19c1d9028d8d6eabef41693433b56e14da025247 Parent branch: origin/master --- client/service/src/task_manager/mod.rs | 25 ++++++++++++++++++++++++- 1 file changed, 24 insertions(+), 1 deletion(-) diff --git a/client/service/src/task_manager/mod.rs b/client/service/src/task_manager/mod.rs index e0e8699ce1d37..cc52db39f31de 100644 --- a/client/service/src/task_manager/mod.rs +++ b/client/service/src/task_manager/mod.rs @@ -18,7 +18,7 @@ use exit_future::Signal; use log::{debug, error}; use futures::{ Future, FutureExt, StreamExt, - future::{select, Either, BoxFuture}, + future::{select, Either, BoxFuture, select_all}, sink::SinkExt, }; use prometheus_endpoint::{ @@ -214,8 +214,14 @@ pub struct TaskManager { essential_failed_rx: TracingUnboundedReceiver<()>, /// Things to keep alive until the task manager is dropped. keep_alive: Box, + /// A sender to a stream of background tasks. This is used for the completion future. task_notifier: TracingUnboundedSender, + /// This future will complete when all the tasks are joined and the stream is closed. completion_future: JoinFuture, + /// A list of other `TaskManager`'s to terminate and gracefully shutdown when the parent + /// terminates and gracefully shutdown. Also ends the parent `future()` if a child's essential + /// task fails. + children: Vec>, } impl TaskManager { @@ -251,6 +257,7 @@ impl TaskManager { keep_alive: Box::new(()), task_notifier, completion_future, + children: Vec::new(), }) } @@ -271,6 +278,13 @@ impl TaskManager { /// Send the signal for termination, prevent new tasks to be created, await for all the existing /// tasks to be finished and drop the object. You can consider this as an async drop. + /// + /// It's always better to call and await this function before exiting the process as background + /// tasks may be running in the background. If the process exit and the background tasks are not + /// cancelled, this will lead to objects not getting dropped properly. + /// + /// This is an issue in some cases as some of our dependencies do require that we drop all the + /// objects properly otherwise it triggers a SIGABRT on exit. pub fn clean_shutdown(mut self) -> Pin + Send>> { self.terminate(); let keep_alive = self.keep_alive; @@ -293,10 +307,12 @@ impl TaskManager { Box::pin(async move { let mut t1 = self.essential_failed_rx.next().fuse(); let mut t2 = self.on_exit.clone().fuse(); + let mut t3 = select_all(self.children.iter_mut().map(|x| x.future())).fuse(); futures::select! { _ = t1 => Err(Error::Other("Essential task failed.".into())), _ = t2 => Ok(()), + (res, _, _) = t3 => res, } }) } @@ -314,6 +330,13 @@ impl TaskManager { pub(super) fn keep_alive(&mut self, to_keep_alive: T) { self.keep_alive = Box::new(to_keep_alive); } + + /// Register another TaskManager to terminate and gracefully shutdown when the parent + /// terminates and gracefully shutdown. Also ends the parent `future()` if a child's essential + /// task fails. + pub fn add_children(&mut self, child: TaskManager) { + self.children.push(Box::new(child)); + } } #[derive(Clone)] From 3abd7d163beb3b3ba04264fe978bfde8365df1e0 Mon Sep 17 00:00:00 2001 From: Cecile Tonglet Date: Thu, 30 Jul 2020 16:14:27 +0200 Subject: [PATCH 03/14] WIP Forked at: 19c1d9028d8d6eabef41693433b56e14da025247 Parent branch: origin/master --- client/service/src/task_manager/mod.rs | 9 +++++++-- 1 file changed, 7 insertions(+), 2 deletions(-) diff --git a/client/service/src/task_manager/mod.rs b/client/service/src/task_manager/mod.rs index cc52db39f31de..8b53d352d4246 100644 --- a/client/service/src/task_manager/mod.rs +++ b/client/service/src/task_manager/mod.rs @@ -18,7 +18,7 @@ use exit_future::Signal; use log::{debug, error}; use futures::{ Future, FutureExt, StreamExt, - future::{select, Either, BoxFuture, select_all}, + future::{select, Either, BoxFuture, select_all, join_all}, sink::SinkExt, }; use prometheus_endpoint::{ @@ -287,10 +287,12 @@ impl TaskManager { /// objects properly otherwise it triggers a SIGABRT on exit. pub fn clean_shutdown(mut self) -> Pin + Send>> { self.terminate(); + let children_shutdowns = self.children.into_iter().map(|x| x.clean_shutdown()); let keep_alive = self.keep_alive; let completion_future = self.completion_future; Box::pin(async move { + join_all(children_shutdowns).await; completion_future.await; drop(keep_alive); }) @@ -321,8 +323,11 @@ impl TaskManager { pub fn terminate(&mut self) { if let Some(signal) = self.signal.take() { let _ = signal.fire(); - // NOTE: task will prevent new tasks to be spawned + // NOTE: this will prevent new tasks to be spawned self.task_notifier.close_channel(); + for child in self.children.iter_mut() { + child.terminate(); + } } } From c07b41683bd0d947a736e55f036cc675b284a197 Mon Sep 17 00:00:00 2001 From: Cecile Tonglet Date: Thu, 30 Jul 2020 16:28:28 +0200 Subject: [PATCH 04/14] WIP Forked at: 19c1d9028d8d6eabef41693433b56e14da025247 Parent branch: origin/master --- client/service/src/task_manager/mod.rs | 14 ++++++++++---- 1 file changed, 10 insertions(+), 4 deletions(-) diff --git a/client/service/src/task_manager/mod.rs b/client/service/src/task_manager/mod.rs index 8b53d352d4246..a3b8f13e5a9fe 100644 --- a/client/service/src/task_manager/mod.rs +++ b/client/service/src/task_manager/mod.rs @@ -311,10 +311,16 @@ impl TaskManager { let mut t2 = self.on_exit.clone().fuse(); let mut t3 = select_all(self.children.iter_mut().map(|x| x.future())).fuse(); - futures::select! { - _ = t1 => Err(Error::Other("Essential task failed.".into())), - _ = t2 => Ok(()), - (res, _, _) = t3 => res, + loop { + futures::select! { + _ = t1 => break Err(Error::Other("Essential task failed.".into())), + _ = t2 => break Ok(()), + (res, _, _) = t3 => if res.is_err() { + break res; + } else { + continue; + }, + } } }) } From c572f4d8d6f11b65f10bec1ccc765baf30b4fdf9 Mon Sep 17 00:00:00 2001 From: Cecile Tonglet Date: Thu, 30 Jul 2020 16:35:37 +0200 Subject: [PATCH 05/14] WIP Forked at: 19c1d9028d8d6eabef41693433b56e14da025247 Parent branch: origin/master --- client/service/src/task_manager/mod.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/client/service/src/task_manager/mod.rs b/client/service/src/task_manager/mod.rs index a3b8f13e5a9fe..be229fcc9cbc9 100644 --- a/client/service/src/task_manager/mod.rs +++ b/client/service/src/task_manager/mod.rs @@ -344,7 +344,7 @@ impl TaskManager { /// Register another TaskManager to terminate and gracefully shutdown when the parent /// terminates and gracefully shutdown. Also ends the parent `future()` if a child's essential - /// task fails. + /// task fails. (But don't end the parent if a child terminates without failure.) pub fn add_children(&mut self, child: TaskManager) { self.children.push(Box::new(child)); } From b0cf50206f38fc374d173cc28a04aa939e0e8d46 Mon Sep 17 00:00:00 2001 From: Cecile Tonglet Date: Wed, 5 Aug 2020 09:27:49 +0200 Subject: [PATCH 06/14] WIP Forked at: 19c1d9028d8d6eabef41693433b56e14da025247 Parent branch: origin/master --- client/service/src/task_manager/mod.rs | 32 +++++++++++++++++--------- 1 file changed, 21 insertions(+), 11 deletions(-) diff --git a/client/service/src/task_manager/mod.rs b/client/service/src/task_manager/mod.rs index be229fcc9cbc9..6cf306e120b54 100644 --- a/client/service/src/task_manager/mod.rs +++ b/client/service/src/task_manager/mod.rs @@ -309,17 +309,27 @@ impl TaskManager { Box::pin(async move { let mut t1 = self.essential_failed_rx.next().fuse(); let mut t2 = self.on_exit.clone().fuse(); - let mut t3 = select_all(self.children.iter_mut().map(|x| x.future())).fuse(); - - loop { - futures::select! { - _ = t1 => break Err(Error::Other("Essential task failed.".into())), - _ = t2 => break Ok(()), - (res, _, _) = t3 => if res.is_err() { - break res; - } else { - continue; - }, + + if self.children.is_empty() { + loop { + futures::select! { + _ = t1 => break Err(Error::Other("Essential task failed.".into())), + _ = t2 => break Ok(()), + } + } + } else { + let mut t3 = select_all(self.children.iter_mut().map(|x| x.future())).fuse(); + + loop { + futures::select! { + _ = t1 => break Err(Error::Other("Essential task failed.".into())), + _ = t2 => break Ok(()), + (res, _, _) = t3 => if res.is_err() { + break res; + } else { + continue; + }, + } } } }) From 3d6e03c8809b49894c021a2d0f06949cd763c0de Mon Sep 17 00:00:00 2001 From: Cecile Tonglet Date: Wed, 5 Aug 2020 10:04:47 +0200 Subject: [PATCH 07/14] WIP Forked at: 19c1d9028d8d6eabef41693433b56e14da025247 Parent branch: origin/master --- client/service/src/task_manager/mod.rs | 4 +- client/service/src/task_manager/tests.rs | 106 ++++++++++++++++++++++- 2 files changed, 105 insertions(+), 5 deletions(-) diff --git a/client/service/src/task_manager/mod.rs b/client/service/src/task_manager/mod.rs index 6cf306e120b54..f7bcb9dedee29 100644 --- a/client/service/src/task_manager/mod.rs +++ b/client/service/src/task_manager/mod.rs @@ -347,14 +347,14 @@ impl TaskManager { } } - /// Set what the task manager should keep alivei + /// Set what the task manager should keep alive. pub(super) fn keep_alive(&mut self, to_keep_alive: T) { self.keep_alive = Box::new(to_keep_alive); } /// Register another TaskManager to terminate and gracefully shutdown when the parent /// terminates and gracefully shutdown. Also ends the parent `future()` if a child's essential - /// task fails. (But don't end the parent if a child terminates without failure.) + /// task fails. (But don't end the parent if a child's normal task fails.) pub fn add_children(&mut self, child: TaskManager) { self.children.push(Box::new(child)); } diff --git a/client/service/src/task_manager/tests.rs b/client/service/src/task_manager/tests.rs index c60d15b3394c3..421d9ff043bad 100644 --- a/client/service/src/task_manager/tests.rs +++ b/client/service/src/task_manager/tests.rs @@ -18,7 +18,7 @@ use crate::config::TaskExecutor; use crate::task_manager::TaskManager; -use futures::future::FutureExt; +use futures::{future::FutureExt, pin_mut, select}; use parking_lot::Mutex; use std::any::Any; use std::sync::Arc; @@ -82,7 +82,7 @@ async fn run_background_task_blocking(duration: Duration, _keep_alive: impl Any) } #[test] -fn ensure_futures_are_awaited_on_shutdown() { +fn ensure_tasks_are_awaited_on_shutdown() { let mut runtime = tokio::runtime::Runtime::new().unwrap(); let handle = runtime.handle().clone(); let task_executor: TaskExecutor = (move |future, _| handle.spawn(future).map(|_| ())).into(); @@ -187,7 +187,7 @@ fn ensure_task_manager_future_ends_when_task_manager_terminated() { } #[test] -fn ensure_task_manager_future_ends_with_error_when_essential_task_ends() { +fn ensure_task_manager_future_ends_with_error_when_essential_task_fails() { let mut runtime = tokio::runtime::Runtime::new().unwrap(); let handle = runtime.handle().clone(); let task_executor: TaskExecutor = (move |future, _| handle.spawn(future).map(|_| ())).into(); @@ -208,3 +208,103 @@ fn ensure_task_manager_future_ends_with_error_when_essential_task_ends() { runtime.block_on(task_manager.clean_shutdown()); assert_eq!(drop_tester, 0); } + +#[test] +fn ensure_children_tasks_ends_when_task_manager_terminated() { + let mut runtime = tokio::runtime::Runtime::new().unwrap(); + let handle = runtime.handle().clone(); + let task_executor: TaskExecutor = (move |future, _| handle.spawn(future).map(|_| ())).into(); + + let mut task_manager = TaskManager::new(task_executor.clone(), None).unwrap(); + let child_1 = TaskManager::new(task_executor.clone(), None).unwrap(); + let spawn_handle_child_1 = child_1.spawn_handle(); + let child_2 = TaskManager::new(task_executor.clone(), None).unwrap(); + let spawn_handle_child_2 = child_2.spawn_handle(); + task_manager.add_children(child_1); + task_manager.add_children(child_2); + let spawn_handle = task_manager.spawn_handle(); + let drop_tester = DropTester::new(); + spawn_handle.spawn("task1", run_background_task(drop_tester.new_ref())); + spawn_handle.spawn("task2", run_background_task(drop_tester.new_ref())); + spawn_handle_child_1.spawn("task3", run_background_task(drop_tester.new_ref())); + spawn_handle_child_2.spawn("task4", run_background_task(drop_tester.new_ref())); + assert_eq!(drop_tester, 4); + // allow the tasks to even start + runtime.block_on(async { tokio::time::delay_for(Duration::from_secs(1)).await }); + assert_eq!(drop_tester, 4); + task_manager.terminate(); + runtime.block_on(task_manager.future()).expect("future has ended without error"); + runtime.block_on(task_manager.clean_shutdown()); + assert_eq!(drop_tester, 0); +} + +#[test] +fn ensure_task_manager_future_ends_with_error_when_childs_essential_task_fails() { + let mut runtime = tokio::runtime::Runtime::new().unwrap(); + let handle = runtime.handle().clone(); + let task_executor: TaskExecutor = (move |future, _| handle.spawn(future).map(|_| ())).into(); + + let mut task_manager = TaskManager::new(task_executor.clone(), None).unwrap(); + let child_1 = TaskManager::new(task_executor.clone(), None).unwrap(); + let spawn_handle_child_1 = child_1.spawn_handle(); + let spawn_essential_handle_child_1 = child_1.spawn_essential_handle(); + let child_2 = TaskManager::new(task_executor.clone(), None).unwrap(); + let spawn_handle_child_2 = child_2.spawn_handle(); + task_manager.add_children(child_1); + task_manager.add_children(child_2); + let spawn_handle = task_manager.spawn_handle(); + let drop_tester = DropTester::new(); + spawn_handle.spawn("task1", run_background_task(drop_tester.new_ref())); + spawn_handle.spawn("task2", run_background_task(drop_tester.new_ref())); + spawn_handle_child_1.spawn("task3", run_background_task(drop_tester.new_ref())); + spawn_handle_child_2.spawn("task4", run_background_task(drop_tester.new_ref())); + assert_eq!(drop_tester, 4); + // allow the tasks to even start + runtime.block_on(async { tokio::time::delay_for(Duration::from_secs(1)).await }); + assert_eq!(drop_tester, 4); + spawn_essential_handle_child_1.spawn("task5", async { panic!("task failed") }); + runtime.block_on(task_manager.future()).expect_err("future()'s Result must be Err"); + assert_eq!(drop_tester, 4); + runtime.block_on(task_manager.clean_shutdown()); + assert_eq!(drop_tester, 0); +} + +#[test] +fn ensure_task_manager_future_continues_when_childs_not_essential_task_fails() { + let mut runtime = tokio::runtime::Runtime::new().unwrap(); + let handle = runtime.handle().clone(); + let task_executor: TaskExecutor = (move |future, _| handle.spawn(future).map(|_| ())).into(); + + let mut task_manager = TaskManager::new(task_executor.clone(), None).unwrap(); + let child_1 = TaskManager::new(task_executor.clone(), None).unwrap(); + let spawn_handle_child_1 = child_1.spawn_handle(); + let child_2 = TaskManager::new(task_executor.clone(), None).unwrap(); + let spawn_handle_child_2 = child_2.spawn_handle(); + task_manager.add_children(child_1); + task_manager.add_children(child_2); + let spawn_handle = task_manager.spawn_handle(); + let drop_tester = DropTester::new(); + spawn_handle.spawn("task1", run_background_task(drop_tester.new_ref())); + spawn_handle.spawn("task2", run_background_task(drop_tester.new_ref())); + spawn_handle_child_1.spawn("task3", run_background_task(drop_tester.new_ref())); + spawn_handle_child_2.spawn("task4", run_background_task(drop_tester.new_ref())); + assert_eq!(drop_tester, 4); + // allow the tasks to even start + runtime.block_on(async { tokio::time::delay_for(Duration::from_secs(1)).await }); + assert_eq!(drop_tester, 4); + spawn_handle_child_1.spawn("task5", async { panic!("task failed") }); + runtime.block_on(async { + let t1 = task_manager.future().fuse(); + let t2 = tokio::time::delay_for(Duration::from_secs(3)).fuse(); + + pin_mut!(t1, t2); + + select! { + _ = t1 => panic!("task should not have stopped"), + _ = t2 => {}, + } + }); + assert_eq!(drop_tester, 4); + runtime.block_on(task_manager.clean_shutdown()); + assert_eq!(drop_tester, 0); +} From 352a69689e49052e8941497cee66332eeaadebb4 Mon Sep 17 00:00:00 2001 From: Cecile Tonglet Date: Wed, 5 Aug 2020 10:13:04 +0200 Subject: [PATCH 08/14] changelog --- docs/CHANGELOG.md | 5 +++++ 1 file changed, 5 insertions(+) diff --git a/docs/CHANGELOG.md b/docs/CHANGELOG.md index 333733d1aee01..719059477e13e 100644 --- a/docs/CHANGELOG.md +++ b/docs/CHANGELOG.md @@ -6,6 +6,11 @@ The format is based on [Keep a Changelog]. ## Unreleased +Client +------ + +* Child nodes can be handled by adding a child `TaskManager` to the parent's `TaskManager` (#6771) + ## 2.0.0-rc4 -> 2.0.0-rc5 – River Dolphin Runtime From 06e4eccdb2577b237104e8147a4e540c0ca09089 Mon Sep 17 00:00:00 2001 From: Cecile Tonglet Date: Wed, 5 Aug 2020 12:20:24 +0200 Subject: [PATCH 09/14] Remove Box --- client/service/src/task_manager/mod.rs | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/client/service/src/task_manager/mod.rs b/client/service/src/task_manager/mod.rs index f7bcb9dedee29..90fcc6c727eed 100644 --- a/client/service/src/task_manager/mod.rs +++ b/client/service/src/task_manager/mod.rs @@ -221,7 +221,7 @@ pub struct TaskManager { /// A list of other `TaskManager`'s to terminate and gracefully shutdown when the parent /// terminates and gracefully shutdown. Also ends the parent `future()` if a child's essential /// task fails. - children: Vec>, + children: Vec, } impl TaskManager { @@ -356,7 +356,7 @@ impl TaskManager { /// terminates and gracefully shutdown. Also ends the parent `future()` if a child's essential /// task fails. (But don't end the parent if a child's normal task fails.) pub fn add_children(&mut self, child: TaskManager) { - self.children.push(Box::new(child)); + self.children.push(child); } } From 49fb8fb6f245c3ca2c384468df14b34f34616736 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Bastian=20K=C3=B6cher?= Date: Thu, 6 Aug 2020 11:29:08 +0200 Subject: [PATCH 10/14] Make future nicer --- client/service/src/task_manager/mod.rs | 44 +++++++++----------------- 1 file changed, 15 insertions(+), 29 deletions(-) diff --git a/client/service/src/task_manager/mod.rs b/client/service/src/task_manager/mod.rs index 90fcc6c727eed..39907b220b22a 100644 --- a/client/service/src/task_manager/mod.rs +++ b/client/service/src/task_manager/mod.rs @@ -18,7 +18,7 @@ use exit_future::Signal; use log::{debug, error}; use futures::{ Future, FutureExt, StreamExt, - future::{select, Either, BoxFuture, select_all, join_all}, + future::{select, Either, BoxFuture, join_all, try_join_all, select_all, self}, sink::SinkExt, }; use prometheus_endpoint::{ @@ -305,34 +305,20 @@ impl TaskManager { /// /// This function will not wait until the end of the remaining task. You must call and await /// `clean_shutdown()` after this. - pub fn future<'a>(&'a mut self) -> Pin> + Send + 'a>> { - Box::pin(async move { - let mut t1 = self.essential_failed_rx.next().fuse(); - let mut t2 = self.on_exit.clone().fuse(); - - if self.children.is_empty() { - loop { - futures::select! { - _ = t1 => break Err(Error::Other("Essential task failed.".into())), - _ = t2 => break Ok(()), - } - } - } else { - let mut t3 = select_all(self.children.iter_mut().map(|x| x.future())).fuse(); - - loop { - futures::select! { - _ = t1 => break Err(Error::Other("Essential task failed.".into())), - _ = t2 => break Ok(()), - (res, _, _) = t3 => if res.is_err() { - break res; - } else { - continue; - }, - } - } - } - }) + pub fn future<'a>( + &'a mut self, + ) -> Pin> + Send + 'a>> { + let t1 = self.essential_failed_rx + .next() + .then(|_| future::ready(Err(Error::Other("Essential task failed.".into())))) + .boxed(); + let t2 = self.on_exit.clone().then(|_| future::ready(Ok(()))).boxed(); + let res = select_all(vec![t1, t2]).map(|v| v.0).boxed(); + + // Join all the task managers to make sure everything has finished as intended. + try_join_all( + self.children.iter_mut().map(|x| x.future().boxed()).chain(std::iter::once(res)), + ).map(|res| res.map(drop)).boxed() } /// Signal to terminate all the running tasks. From 02c2ece0f15f90578d72f2be747bcfd74524e59b Mon Sep 17 00:00:00 2001 From: Cecile Tonglet Date: Thu, 6 Aug 2020 13:24:49 +0200 Subject: [PATCH 11/14] Revert "Make future nicer" This reverts commit 49fb8fb6f245c3ca2c384468df14b34f34616736. --- client/service/src/task_manager/mod.rs | 44 +++++++++++++++++--------- 1 file changed, 29 insertions(+), 15 deletions(-) diff --git a/client/service/src/task_manager/mod.rs b/client/service/src/task_manager/mod.rs index 39907b220b22a..90fcc6c727eed 100644 --- a/client/service/src/task_manager/mod.rs +++ b/client/service/src/task_manager/mod.rs @@ -18,7 +18,7 @@ use exit_future::Signal; use log::{debug, error}; use futures::{ Future, FutureExt, StreamExt, - future::{select, Either, BoxFuture, join_all, try_join_all, select_all, self}, + future::{select, Either, BoxFuture, select_all, join_all}, sink::SinkExt, }; use prometheus_endpoint::{ @@ -305,20 +305,34 @@ impl TaskManager { /// /// This function will not wait until the end of the remaining task. You must call and await /// `clean_shutdown()` after this. - pub fn future<'a>( - &'a mut self, - ) -> Pin> + Send + 'a>> { - let t1 = self.essential_failed_rx - .next() - .then(|_| future::ready(Err(Error::Other("Essential task failed.".into())))) - .boxed(); - let t2 = self.on_exit.clone().then(|_| future::ready(Ok(()))).boxed(); - let res = select_all(vec![t1, t2]).map(|v| v.0).boxed(); - - // Join all the task managers to make sure everything has finished as intended. - try_join_all( - self.children.iter_mut().map(|x| x.future().boxed()).chain(std::iter::once(res)), - ).map(|res| res.map(drop)).boxed() + pub fn future<'a>(&'a mut self) -> Pin> + Send + 'a>> { + Box::pin(async move { + let mut t1 = self.essential_failed_rx.next().fuse(); + let mut t2 = self.on_exit.clone().fuse(); + + if self.children.is_empty() { + loop { + futures::select! { + _ = t1 => break Err(Error::Other("Essential task failed.".into())), + _ = t2 => break Ok(()), + } + } + } else { + let mut t3 = select_all(self.children.iter_mut().map(|x| x.future())).fuse(); + + loop { + futures::select! { + _ = t1 => break Err(Error::Other("Essential task failed.".into())), + _ = t2 => break Ok(()), + (res, _, _) = t3 => if res.is_err() { + break res; + } else { + continue; + }, + } + } + } + }) } /// Signal to terminate all the running tasks. From 4fdf8035f4aa8f163f11020aa3fa1f5757e1d459 Mon Sep 17 00:00:00 2001 From: Cecile Tonglet Date: Thu, 6 Aug 2020 13:23:29 +0200 Subject: [PATCH 12/14] Simplify --- client/service/src/task_manager/mod.rs | 32 ++++++++------------------ 1 file changed, 9 insertions(+), 23 deletions(-) diff --git a/client/service/src/task_manager/mod.rs b/client/service/src/task_manager/mod.rs index 90fcc6c727eed..fd3cc9acea417 100644 --- a/client/service/src/task_manager/mod.rs +++ b/client/service/src/task_manager/mod.rs @@ -18,7 +18,7 @@ use exit_future::Signal; use log::{debug, error}; use futures::{ Future, FutureExt, StreamExt, - future::{select, Either, BoxFuture, select_all, join_all}, + future::{select, Either, BoxFuture, join_all, try_join_all, pending}, sink::SinkExt, }; use prometheus_endpoint::{ @@ -309,28 +309,14 @@ impl TaskManager { Box::pin(async move { let mut t1 = self.essential_failed_rx.next().fuse(); let mut t2 = self.on_exit.clone().fuse(); - - if self.children.is_empty() { - loop { - futures::select! { - _ = t1 => break Err(Error::Other("Essential task failed.".into())), - _ = t2 => break Ok(()), - } - } - } else { - let mut t3 = select_all(self.children.iter_mut().map(|x| x.future())).fuse(); - - loop { - futures::select! { - _ = t1 => break Err(Error::Other("Essential task failed.".into())), - _ = t2 => break Ok(()), - (res, _, _) = t3 => if res.is_err() { - break res; - } else { - continue; - }, - } - } + let mut t3 = try_join_all(self.children.iter_mut().map(|x| x.future())) + // Never end this future if there is no error + .then(|res| async { Ok(res.map(|_| pending::<()>())?.await) }).boxed().fuse(); + + futures::select! { + _ = t1 => Err(Error::Other("Essential task failed.".into())), + _ = t2 => Ok(()), + res = t3 => res.map(|_| ()), } }) } From 932e60126d61409c96b6783c730cd1a534f8d1e5 Mon Sep 17 00:00:00 2001 From: Cecile Tonglet Date: Thu, 6 Aug 2020 13:35:20 +0200 Subject: [PATCH 13/14] Additional check --- client/service/src/task_manager/mod.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/client/service/src/task_manager/mod.rs b/client/service/src/task_manager/mod.rs index fd3cc9acea417..9a35bbd4f94e5 100644 --- a/client/service/src/task_manager/mod.rs +++ b/client/service/src/task_manager/mod.rs @@ -316,7 +316,7 @@ impl TaskManager { futures::select! { _ = t1 => Err(Error::Other("Essential task failed.".into())), _ = t2 => Ok(()), - res = t3 => res.map(|_| ()), + res = t3 => Err(res.map(|_| ()).expect_err("this future never ends; qed")), } }) } From f3b520fca8d10bd5d2b488190125e4d189980aa9 Mon Sep 17 00:00:00 2001 From: Cecile Tonglet Date: Thu, 6 Aug 2020 13:43:28 +0200 Subject: [PATCH 14/14] Simplify more --- client/service/src/task_manager/mod.rs | 9 ++++++--- client/service/src/task_manager/tests.rs | 2 +- 2 files changed, 7 insertions(+), 4 deletions(-) diff --git a/client/service/src/task_manager/mod.rs b/client/service/src/task_manager/mod.rs index 9a35bbd4f94e5..729b43bce94c1 100644 --- a/client/service/src/task_manager/mod.rs +++ b/client/service/src/task_manager/mod.rs @@ -309,9 +309,12 @@ impl TaskManager { Box::pin(async move { let mut t1 = self.essential_failed_rx.next().fuse(); let mut t2 = self.on_exit.clone().fuse(); - let mut t3 = try_join_all(self.children.iter_mut().map(|x| x.future())) - // Never end this future if there is no error - .then(|res| async { Ok(res.map(|_| pending::<()>())?.await) }).boxed().fuse(); + let mut t3 = try_join_all( + self.children.iter_mut().map(|x| x.future()) + // Never end this future if there is no error because if there is no children, + // it must not stop + .chain(std::iter::once(pending().boxed())) + ).fuse(); futures::select! { _ = t1 => Err(Error::Other("Essential task failed.".into())), diff --git a/client/service/src/task_manager/tests.rs b/client/service/src/task_manager/tests.rs index 421d9ff043bad..a2bd84802aa17 100644 --- a/client/service/src/task_manager/tests.rs +++ b/client/service/src/task_manager/tests.rs @@ -300,7 +300,7 @@ fn ensure_task_manager_future_continues_when_childs_not_essential_task_fails() { pin_mut!(t1, t2); select! { - _ = t1 => panic!("task should not have stopped"), + res = t1 => panic!("task should not have stopped: {:?}", res), _ = t2 => {}, } });