From 622049432d87616b82ea1aa16703255ade92dd47 Mon Sep 17 00:00:00 2001 From: Ammar Arif Date: Mon, 7 Oct 2024 22:54:58 -0400 Subject: [PATCH 1/4] launched node abstraction --- Cargo.lock | 1 + bin/katana/src/cli/node.rs | 25 ++-- crates/dojo-test-utils/src/sequencer.rs | 17 +-- crates/katana/node/Cargo.toml | 1 + crates/katana/node/src/exit.rs | 41 ++++++ crates/katana/node/src/lib.rs | 164 +++++++++++++++--------- crates/katana/tasks/src/manager.rs | 53 ++++++-- 7 files changed, 215 insertions(+), 87 deletions(-) create mode 100644 crates/katana/node/src/exit.rs diff --git a/Cargo.lock b/Cargo.lock index 24f1e06269..7d34b34a0c 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -8024,6 +8024,7 @@ version = "1.0.0-alpha.14" dependencies = [ "anyhow", "dojo-metrics", + "futures", "hyper 0.14.30", "jsonrpsee 0.16.3", "katana-core", diff --git a/bin/katana/src/cli/node.rs b/bin/katana/src/cli/node.rs index f4b062b4d1..483b629d7d 100644 --- a/bin/katana/src/cli/node.rs +++ b/bin/katana/src/cli/node.rs @@ -225,23 +225,30 @@ impl NodeArgs { let sequencer_config = self.sequencer_config(); let starknet_config = self.starknet_config()?; - // build the node and start it - let node = katana_node::start(server_config, sequencer_config, starknet_config).await?; + // Build the node + let node = katana_node::build(server_config, sequencer_config, starknet_config).await?; if !self.silent { #[allow(deprecated)] let genesis = &node.backend.config.genesis; - print_intro(&self, genesis, node.rpc.addr); + let server_address = node.server_config.addr(); + print_intro(&self, genesis, &server_address); } - // Wait until an OS signal is received or TaskManager shutdown + // Launch the node + let handle = node.launch().await.context("failed to launch node")?; + + // Wait until an OS signal (ie SIGINT, SIGTERM) is received or the node is shutdown. tokio::select! { - _ = dojo_utils::signal::wait_signals() => {}, - _ = node.task_manager.wait_for_shutdown() => {} + _ = dojo_utils::signal::wait_signals() => { + // Gracefully shutdown the node before exiting + handle.stop().await?; + }, + + _ = handle.stopped() => { } } - info!("Shutting down..."); - node.stop().await?; + info!("Shutting down."); Ok(()) } @@ -339,7 +346,7 @@ impl NodeArgs { } } -fn print_intro(args: &NodeArgs, genesis: &Genesis, address: SocketAddr) { +fn print_intro(args: &NodeArgs, genesis: &Genesis, address: &str) { let mut accounts = genesis.accounts().peekable(); let account_class_hash = accounts.peek().map(|e| e.1.class_hash()); let seed = &args.starknet.seed; diff --git a/crates/dojo-test-utils/src/sequencer.rs b/crates/dojo-test-utils/src/sequencer.rs index 039b26fab1..80bb9b2e0a 100644 --- a/crates/dojo-test-utils/src/sequencer.rs +++ b/crates/dojo-test-utils/src/sequencer.rs @@ -7,7 +7,7 @@ use katana_core::constants::DEFAULT_SEQUENCER_ADDRESS; #[allow(deprecated)] pub use katana_core::sequencer::SequencerConfig; use katana_executor::implementation::blockifier::BlockifierFactory; -use katana_node::Handle; +use katana_node::LaunchedNode; use katana_primitives::chain::ChainId; use katana_rpc::config::ServerConfig; use katana_rpc_api::ApiKind; @@ -29,7 +29,7 @@ pub struct TestAccount { #[allow(missing_debug_implementations)] pub struct TestSequencer { url: Url, - handle: Handle, + handle: LaunchedNode, account: TestAccount, } @@ -45,19 +45,20 @@ impl TestSequencer { apis: vec![ApiKind::Starknet, ApiKind::Dev, ApiKind::Saya, ApiKind::Torii], }; - let node = katana_node::start(server_config, config, starknet_config) + let node = katana_node::build(server_config, config, starknet_config) .await .expect("Failed to build node components"); + let handle = node.launch().await.expect("Failed to launch node"); - let url = Url::parse(&format!("http://{}", node.rpc.addr)).expect("Failed to parse URL"); + let url = Url::parse(&format!("http://{}", handle.rpc.addr)).expect("Failed to parse URL"); - let account = node.backend.config.genesis.accounts().next().unwrap(); + let account = handle.node.backend.config.genesis.accounts().next().unwrap(); let account = TestAccount { private_key: Felt::from_bytes_be(&account.1.private_key().unwrap().to_bytes_be()), account_address: Felt::from_bytes_be(&account.0.to_bytes_be()), }; - TestSequencer { handle: node, account, url } + TestSequencer { handle, account, url } } pub fn account(&self) -> SingleOwnerAccount, LocalWallet> { @@ -79,7 +80,7 @@ impl TestSequencer { } pub fn backend(&self) -> &Arc> { - &self.handle.backend + &self.handle.node.backend } pub fn account_at_index( @@ -87,7 +88,7 @@ impl TestSequencer { index: usize, ) -> SingleOwnerAccount, LocalWallet> { #[allow(deprecated)] - let accounts: Vec<_> = self.handle.backend.config.genesis.accounts().collect::<_>(); + let accounts: Vec<_> = self.handle.node.backend.config.genesis.accounts().collect::<_>(); let account = accounts[index]; let private_key = Felt::from_bytes_be(&account.1.private_key().unwrap().to_bytes_be()); diff --git a/crates/katana/node/Cargo.toml b/crates/katana/node/Cargo.toml index e0973df04f..45857e5134 100644 --- a/crates/katana/node/Cargo.toml +++ b/crates/katana/node/Cargo.toml @@ -19,6 +19,7 @@ katana-tasks.workspace = true anyhow.workspace = true dojo-metrics.workspace = true +futures.workspace = true hyper.workspace = true jsonrpsee.workspace = true num-traits.workspace = true diff --git a/crates/katana/node/src/exit.rs b/crates/katana/node/src/exit.rs new file mode 100644 index 0000000000..de1e42c9f3 --- /dev/null +++ b/crates/katana/node/src/exit.rs @@ -0,0 +1,41 @@ +use std::future::Future; +use std::pin::Pin; +use std::task::{Context, Poll}; + +use anyhow::Result; +use futures::future::BoxFuture; +use futures::FutureExt; + +use crate::LaunchedNode; + +/// A Future that is resolved once the node has been stopped including all of its running tasks. +#[must_use = "futures do nothing unless polled"] +pub struct NodeStoppedFuture<'a> { + fut: BoxFuture<'a, Result<()>>, +} + +impl<'a> NodeStoppedFuture<'a> { + pub(crate) fn new(handle: &'a LaunchedNode) -> Self { + let fut = Box::pin(async { + handle.node.task_manager.wait_for_shutdown().await; + handle.stop().await?; + Ok(()) + }); + Self { fut } + } +} + +impl<'a> Future for NodeStoppedFuture<'a> { + type Output = Result<()>; + + fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { + let this = self.get_mut(); + this.fut.poll_unpin(cx) + } +} + +impl<'a> core::fmt::Debug for NodeStoppedFuture<'a> { + fn fmt(&self, f: &mut core::fmt::Formatter<'_>) -> core::fmt::Result { + f.debug_struct("NodeStoppedFuture").field("fut", &"...").finish() + } +} diff --git a/crates/katana/node/src/lib.rs b/crates/katana/node/src/lib.rs index 452a12307d..f5d638f456 100644 --- a/crates/katana/node/src/lib.rs +++ b/crates/katana/node/src/lib.rs @@ -1,5 +1,7 @@ #![cfg_attr(not(test), warn(unused_crate_dependencies))] +mod exit; + use std::future::IntoFuture; use std::net::SocketAddr; use std::sync::Arc; @@ -19,6 +21,7 @@ use katana_core::env::BlockContextGenerator; #[allow(deprecated)] use katana_core::sequencer::SequencerConfig; use katana_core::service::block_producer::BlockProducer; +use katana_db::mdbx::DbEnv; use katana_executor::implementation::blockifier::BlockifierFactory; use katana_executor::{ExecutorFactory, SimulationFlag}; use katana_pipeline::{stage, Pipeline}; @@ -49,27 +52,107 @@ use starknet::providers::{JsonRpcClient, Provider}; use tower_http::cors::{AllowOrigin, CorsLayer}; use tracing::{info, trace}; -/// A handle to the instantiated Katana node. +use crate::exit::NodeStoppedFuture; + +/// A handle to the launched Katana node. #[allow(missing_debug_implementations)] -pub struct Handle { - pub pool: TxPool, +pub struct LaunchedNode { + pub node: Node, + /// Handle to the rpc server. pub rpc: RpcServer, +} + +impl LaunchedNode { + /// Stops the node. + /// + /// This will instruct the node to stop and wait until it has actually stop. + pub async fn stop(&self) -> Result<()> { + // TODO: wait for the rpc server to stop instead of just stopping it. + self.rpc.handle.stop()?; + self.node.task_manager.shutdown().await; + Ok(()) + } + + /// Returns a future which resolves only when the node has stopped. + pub fn stopped(&self) -> NodeStoppedFuture<'_> { + NodeStoppedFuture::new(self) + } +} + +/// A node instance. +/// +/// The struct contains the handle to all the components of the node. +#[must_use = "Node does nothing unless launched."] +#[allow(missing_debug_implementations)] +pub struct Node { + pub pool: TxPool, + pub db: Option, pub task_manager: TaskManager, pub backend: Arc>, pub block_producer: BlockProducer, + pub server_config: ServerConfig, + #[allow(deprecated)] + pub sequencer_config: SequencerConfig, } -impl Handle { - /// Stops the Katana node. - pub async fn stop(self) -> Result<()> { - // TODO: wait for the rpc server to stop - self.rpc.handle.stop()?; - self.task_manager.shutdown().await; - Ok(()) +impl Node { + /// Start the node. + /// + /// This method will start all the node process, running them until the node is stopped. + pub async fn launch(self) -> Result { + // Metrics recorder must be initialized before calling any of the metrics macros, in order + // for it to be registered. + + if let Some(addr) = self.server_config.metrics { + let prometheus_handle = prometheus_exporter::install_recorder("katana")?; + let mut reports = Vec::new(); + + if let Some(ref db) = self.db { + reports.push(Box::new(db.clone()) as Box); + } + + prometheus_exporter::serve( + addr, + prometheus_handle, + metrics_process::Collector::default(), + reports, + ) + .await?; + + info!(%addr, "Metrics endpoint started."); + } + + let pool = self.pool.clone(); + let backend = self.backend.clone(); + let block_producer = self.block_producer.clone(); + let validator = self.block_producer.validator().clone(); + + // --- build sequencing stage + + #[allow(deprecated)] + let sequencing = stage::Sequencing::new( + pool.clone(), + backend.clone(), + self.task_manager.clone(), + block_producer.clone(), + self.sequencer_config.messaging.clone(), + ); + + // --- build and start the pipeline + + let mut pipeline = Pipeline::new(); + pipeline.add_stage(Box::new(sequencing)); + + self.task_manager.spawn(pipeline.into_future()); + + let node_components = (pool, backend, block_producer, validator); + let rpc = spawn(node_components, self.server_config.clone()).await?; + + Ok(LaunchedNode { node: self, rpc }) } } -/// Build the core Katana components from the given configurations and start running the node. +/// Build the core Katana components from the given configurations. // TODO: placeholder until we implement a dedicated class that encapsulate building the node // components // @@ -79,11 +162,11 @@ impl Handle { // // NOTE: Don't rely on this function as it is mainly used as a placeholder for now. #[allow(deprecated)] -pub async fn start( +pub async fn build( server_config: ServerConfig, sequencer_config: SequencerConfig, mut starknet_config: StarknetConfig, -) -> Result { +) -> Result { // --- build executor factory let cfg_env = CfgEnv { @@ -189,52 +272,17 @@ pub async fn start( let validator = block_producer.validator(); let pool = TxPool::new(validator.clone(), FiFo::new()); - // --- build metrics service - - // Metrics recorder must be initialized before calling any of the metrics macros, in order for - // it to be registered. - if let Some(addr) = server_config.metrics { - let prometheus_handle = prometheus_exporter::install_recorder("katana")?; - let reports = db.map(|db| vec![Box::new(db) as Box]).unwrap_or_default(); - - prometheus_exporter::serve( - addr, - prometheus_handle, - metrics_process::Collector::default(), - reports, - ) - .await?; - - info!(%addr, "Metrics endpoint started."); - } - - // --- create a TaskManager using the ambient Tokio runtime - - let task_manager = TaskManager::current(); - - // --- build sequencing stage - - let sequencing = stage::Sequencing::new( - pool.clone(), - backend.clone(), - task_manager.clone(), - block_producer.clone(), - sequencer_config.messaging.clone(), - ); - - // --- build and start the pipeline - - let mut pipeline = Pipeline::new(); - pipeline.add_stage(Box::new(sequencing)); - - task_manager.spawn(pipeline.into_future()); - - // --- spawn rpc server - - let node_components = (pool.clone(), backend.clone(), block_producer.clone(), validator); - let rpc = spawn(node_components, server_config).await?; + let node = Node { + db, + pool, + backend, + server_config, + block_producer, + sequencer_config, + task_manager: TaskManager::current(), + }; - Ok(Handle { backend, block_producer, pool, rpc, task_manager }) + Ok(node) } // Moved from `katana_rpc` crate diff --git a/crates/katana/tasks/src/manager.rs b/crates/katana/tasks/src/manager.rs index f79a7657e8..25bd33fb4f 100644 --- a/crates/katana/tasks/src/manager.rs +++ b/crates/katana/tasks/src/manager.rs @@ -1,8 +1,13 @@ -use std::future::Future; +use core::future::Future; +use core::pin::Pin; +use core::task::{Context, Poll}; +use futures::future::BoxFuture; +use futures::FutureExt; use tokio::runtime::Handle; use tokio::task::JoinHandle; use tokio_util::sync::CancellationToken; +pub use tokio_util::sync::WaitForCancellationFuture as WaitForShutdownFuture; use tokio_util::task::TaskTracker; use crate::task::{TaskBuilder, TaskResult}; @@ -42,25 +47,29 @@ impl TaskManager { self.spawn_inner(fut) } - /// Wait for the shutdown signal to be received. - pub async fn wait_for_shutdown(&self) { - self.on_cancel.cancelled().await; + /// Returns a future that can be awaited for the shutdown signal to be received. + pub fn wait_for_shutdown(&self) -> WaitForShutdownFuture<'_> { + self.on_cancel.cancelled() } /// Shuts down the manager and wait until all currently running tasks are finished, either due /// to completion or cancellation. /// /// No task can be spawned on the manager after this method is called. - pub async fn shutdown(self) { - if !self.on_cancel.is_cancelled() { - self.on_cancel.cancel(); - } + pub fn shutdown(&self) -> ShutdownFuture<'_> { + let fut = Box::pin(async { + if !self.on_cancel.is_cancelled() { + self.on_cancel.cancel(); + } - self.wait_for_shutdown().await; + self.wait_for_shutdown().await; - // need to close the tracker first before waiting - let _ = self.tracker.close(); - self.tracker.wait().await; + // need to close the tracker first before waiting + let _ = self.tracker.close(); + self.tracker.wait().await; + }); + + ShutdownFuture { fut } } /// Return the handle to the Tokio runtime that the manager is associated with. @@ -117,6 +126,26 @@ impl Drop for TaskManager { } } +/// A futures that resolves when the [TaskManager] is shutdown. +#[must_use = "futures do nothing unless polled"] +pub struct ShutdownFuture<'a> { + fut: BoxFuture<'a, ()>, +} + +impl<'a> Future for ShutdownFuture<'a> { + type Output = (); + + fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { + self.get_mut().fut.poll_unpin(cx) + } +} + +impl<'a> core::fmt::Debug for ShutdownFuture<'a> { + fn fmt(&self, f: &mut core::fmt::Formatter<'_>) -> core::fmt::Result { + f.debug_struct("ShutdownFuture").field("fut", &"...").finish() + } +} + #[cfg(test)] mod tests { use futures::future; From e654369d2ab19e9099164476a7dcf51ad7caa7ac Mon Sep 17 00:00:00 2001 From: Ammar Arif Date: Mon, 7 Oct 2024 23:09:19 -0400 Subject: [PATCH 2/4] comment --- crates/katana/node/src/lib.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/crates/katana/node/src/lib.rs b/crates/katana/node/src/lib.rs index f5d638f456..5c3e6b29bf 100644 --- a/crates/katana/node/src/lib.rs +++ b/crates/katana/node/src/lib.rs @@ -54,7 +54,7 @@ use tracing::{info, trace}; use crate::exit::NodeStoppedFuture; -/// A handle to the launched Katana node. +/// A handle to the launched node. #[allow(missing_debug_implementations)] pub struct LaunchedNode { pub node: Node, From 9f48bbfd6cfaa9d077e2c50fac5ab2353c6e6fc2 Mon Sep 17 00:00:00 2001 From: Ammar Arif Date: Mon, 7 Oct 2024 23:10:50 -0400 Subject: [PATCH 3/4] add error context --- bin/katana/src/cli/node.rs | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/bin/katana/src/cli/node.rs b/bin/katana/src/cli/node.rs index 483b629d7d..d8193c26c3 100644 --- a/bin/katana/src/cli/node.rs +++ b/bin/katana/src/cli/node.rs @@ -226,7 +226,9 @@ impl NodeArgs { let starknet_config = self.starknet_config()?; // Build the node - let node = katana_node::build(server_config, sequencer_config, starknet_config).await?; + let node = katana_node::build(server_config, sequencer_config, starknet_config) + .await + .context("failed to build node")?; if !self.silent { #[allow(deprecated)] From 05597026696b31e23affacde5a3060be66119559 Mon Sep 17 00:00:00 2001 From: Ammar Arif Date: Mon, 7 Oct 2024 23:32:22 -0400 Subject: [PATCH 4/4] install recorder first --- crates/katana/node/src/lib.rs | 14 ++++++++------ crates/metrics/src/prometheus_exporter.rs | 3 ++- 2 files changed, 10 insertions(+), 7 deletions(-) diff --git a/crates/katana/node/src/lib.rs b/crates/katana/node/src/lib.rs index 5c3e6b29bf..5cefd5dcb1 100644 --- a/crates/katana/node/src/lib.rs +++ b/crates/katana/node/src/lib.rs @@ -8,6 +8,7 @@ use std::sync::Arc; use std::time::Duration; use anyhow::Result; +use dojo_metrics::prometheus_exporter::PrometheusHandle; use dojo_metrics::{metrics_process, prometheus_exporter, Report}; use hyper::{Method, Uri}; use jsonrpsee::server::middleware::proxy_get_request::ProxyGetRequestLayer; @@ -88,6 +89,7 @@ pub struct Node { pub pool: TxPool, pub db: Option, pub task_manager: TaskManager, + pub prometheus_handle: PrometheusHandle, pub backend: Arc>, pub block_producer: BlockProducer, pub server_config: ServerConfig, @@ -100,20 +102,15 @@ impl Node { /// /// This method will start all the node process, running them until the node is stopped. pub async fn launch(self) -> Result { - // Metrics recorder must be initialized before calling any of the metrics macros, in order - // for it to be registered. - if let Some(addr) = self.server_config.metrics { - let prometheus_handle = prometheus_exporter::install_recorder("katana")?; let mut reports = Vec::new(); - if let Some(ref db) = self.db { reports.push(Box::new(db.clone()) as Box); } prometheus_exporter::serve( addr, - prometheus_handle, + self.prometheus_handle.clone(), metrics_process::Collector::default(), reports, ) @@ -167,6 +164,10 @@ pub async fn build( sequencer_config: SequencerConfig, mut starknet_config: StarknetConfig, ) -> Result { + // Metrics recorder must be initialized before calling any of the metrics macros, in order + // for it to be registered. + let prometheus_handle = prometheus_exporter::install_recorder("katana")?; + // --- build executor factory let cfg_env = CfgEnv { @@ -279,6 +280,7 @@ pub async fn build( server_config, block_producer, sequencer_config, + prometheus_handle, task_manager: TaskManager::current(), }; diff --git a/crates/metrics/src/prometheus_exporter.rs b/crates/metrics/src/prometheus_exporter.rs index a84f9a9e55..deaa9a0133 100644 --- a/crates/metrics/src/prometheus_exporter.rs +++ b/crates/metrics/src/prometheus_exporter.rs @@ -9,7 +9,8 @@ use anyhow::{Context, Result}; use hyper::service::{make_service_fn, service_fn}; use hyper::{Body, Request, Response, Server}; use metrics::{describe_gauge, gauge}; -use metrics_exporter_prometheus::{PrometheusBuilder, PrometheusHandle}; +use metrics_exporter_prometheus::PrometheusBuilder; +pub use metrics_exporter_prometheus::PrometheusHandle; use metrics_util::layers::{PrefixLayer, Stack}; use crate::Report;