Skip to content

Commit

Permalink
On shutdown clear services from core and async runtime
Browse files Browse the repository at this point in the history
  • Loading branch information
tiram88 committed Oct 3, 2023
1 parent 0ed185e commit 993bedb
Show file tree
Hide file tree
Showing 5 changed files with 49 additions and 7 deletions.
4 changes: 3 additions & 1 deletion components/consensusmanager/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -102,6 +102,8 @@ pub struct ConsensusManager {
}

impl ConsensusManager {
pub const IDENT: &'static str = "consensus manager";

pub fn new(factory: Arc<dyn ConsensusFactory>) -> Self {
let (consensus, ctl) = factory.new_active_consensus();
Self { factory, inner: RwLock::new(ManagerInner::new(consensus, ctl)) }
Expand Down Expand Up @@ -150,7 +152,7 @@ impl ConsensusManager {

impl Service for ConsensusManager {
fn ident(self: Arc<Self>) -> &'static str {
"consensus manager"
Self::IDENT
}

fn start(self: Arc<Self>, _core: Arc<Core>) -> Vec<JoinHandle<()>> {
Expand Down
7 changes: 7 additions & 0 deletions core/src/core.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,10 @@ impl Core {
self.services.lock().unwrap().push(service);
}

pub fn find(&self, ident: &'static str) -> Option<Arc<dyn Service>> {
self.services.lock().unwrap().iter().find(|s| (*s).clone().ident() == ident).cloned()
}

/// Starts all services and blocks waiting to join them. For performing other operations in between
/// use start and join explicitly
pub fn run(self: &Arc<Core>) {
Expand Down Expand Up @@ -54,6 +58,9 @@ impl Core {
}
}

// Drop all services and cleanup
self.services.lock().unwrap().clear();

trace!("... core is shut down");
}
}
Expand Down
11 changes: 7 additions & 4 deletions core/src/task/runtime.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,8 +10,6 @@ use std::{
};
use tokio::task::JoinHandle as TaskJoinHandle;

const ASYNC_RUNTIME: &str = "async-runtime";

/// AsyncRuntime registers async services and provides
/// a tokio Runtime to run them.
pub struct AsyncRuntime {
Expand All @@ -27,6 +25,8 @@ impl Default for AsyncRuntime {
}

impl AsyncRuntime {
pub const IDENT: &'static str = "async-runtime";

pub fn new(threads: usize) -> Self {
trace!("Creating the async-runtime service");
Self { threads, services: Mutex::new(Vec::new()) }
Expand All @@ -42,7 +42,7 @@ impl AsyncRuntime {

pub fn init(self: Arc<AsyncRuntime>, core: Arc<Core>) -> Vec<ThreadJoinHandle<()>> {
trace!("initializing async-runtime service");
vec![thread::Builder::new().name(ASYNC_RUNTIME.to_string()).spawn(move || self.worker(core)).unwrap()]
vec![thread::Builder::new().name(Self::IDENT.to_string()).spawn(move || self.worker(core)).unwrap()]
}

/// Launch a tokio Runtime and run the top-level async objects
Expand Down Expand Up @@ -106,6 +106,9 @@ impl AsyncRuntime {
.collect::<Vec<TaskJoinHandle<AsyncServiceResult<()>>>>();
try_join_all(futures).await.unwrap();

// Drop all services and cleanup
self.services.lock().unwrap().clear();

trace!("async-runtime worker stopped");
}

Expand All @@ -119,7 +122,7 @@ impl AsyncRuntime {

impl Service for AsyncRuntime {
fn ident(self: Arc<AsyncRuntime>) -> &'static str {
ASYNC_RUNTIME
Self::IDENT
}

fn start(self: Arc<AsyncRuntime>, core: Arc<Core>) -> Vec<ThreadJoinHandle<()>> {
Expand Down
2 changes: 1 addition & 1 deletion testing/integration/src/common/daemon.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ pub struct Daemon {
pub rpc_port: u16,
pub p2p_port: u16,

core: Arc<Core>,
pub core: Arc<Core>,
workers: Option<Vec<std::thread::JoinHandle<()>>>,

_appdir_tempdir: TempDir,
Expand Down
32 changes: 31 additions & 1 deletion testing/integration/src/daemon_integration_tests.rs
Original file line number Diff line number Diff line change
@@ -1,9 +1,11 @@
use kaspa_addresses::Address;
use kaspa_consensusmanager::ConsensusManager;
use kaspa_core::task::runtime::AsyncRuntime;
use kaspa_rpc_core::api::rpc::RpcApi;
use kaspad_lib::args::Args;

use crate::common::daemon::Daemon;
use std::time::Duration;
use std::{sync::Arc, time::Duration};

#[tokio::test]
async fn daemon_sanity_test() {
Expand Down Expand Up @@ -59,3 +61,31 @@ async fn daemon_mining_test() {
assert_eq!(accepted_txs_pair.accepted_transaction_ids.len(), 1);
}
}

#[tokio::test]
async fn daemon_cleaning_test() {
kaspa_core::log::try_init_logger("info,kaspa_grpc_core=trace,kaspa_grpc_server=trace,kaspa_grpc_client=trace");
let args = Args { devnet: true, ..Default::default() };
let consensus_manager;
let async_runtime;
let core;
{
let mut kaspad1 = Daemon::new_random_with_args(args);
let dyn_consensus_manager = kaspad1.core.find(ConsensusManager::IDENT).unwrap();
let dyn_async_runtime = kaspad1.core.find(AsyncRuntime::IDENT).unwrap();
consensus_manager =
Arc::downgrade(&Arc::downcast::<kaspa_consensusmanager::ConsensusManager>(dyn_consensus_manager.arc_any()).unwrap());
async_runtime = Arc::downgrade(&Arc::downcast::<AsyncRuntime>(dyn_async_runtime.arc_any()).unwrap());
core = Arc::downgrade(&kaspad1.core);

let rpc_client1 = kaspad1.start().await;
rpc_client1.disconnect().await.unwrap();
drop(rpc_client1);
kaspad1.shutdown();
}
tokio::time::sleep(Duration::from_secs(4)).await;

assert_eq!(consensus_manager.strong_count(), 0);
assert_eq!(async_runtime.strong_count(), 0);
assert_eq!(core.strong_count(), 0);
}

0 comments on commit 993bedb

Please sign in to comment.