From a5514420e53fbd09fa0eb4d8de8f65822c18f660 Mon Sep 17 00:00:00 2001 From: Green Baneling Date: Thu, 5 Jan 2023 23:14:24 +0000 Subject: [PATCH] Refactor of `GraphQL` service and `FuelService` to use `ServiceRunner` (#875) The final change of https://github.com/FuelLabs/fuel-core/pull/860 epic. Ref https://github.com/FuelLabs/fuel-core/issues/809 Reworked `RunnableService` to be `RunnableService` and `RunnableTask`. `RunnableService::initialize` replaced with the `RunnableService::into_task` method. `into_task` returns a runnable task that implements the `RunnableTask` trait. `into_task` may return another type after initialization. Updated all services to implement new traits. Implemented GraphQL service via `ServiceRunner`. Extracted the graph QL logic into a separate module(preparation to move this service into its own crate). Re-used`ServiceRunner` for `FuelService`. Replaced `Modules` with `SharedState` and `SubServices`. Added a new `Starting` state of the service lifecycle. Added functions to allow to await `Started` or `Stop` state. --- Cargo.lock | 21 +- bin/fuel-core/src/cli/run.rs | 3 +- crates/fuel-core/Cargo.toml | 1 + crates/fuel-core/src/executor.rs | 2 +- crates/fuel-core/src/graphql_api.rs | 21 + crates/fuel-core/src/graphql_api/service.rs | 223 ++++++++ crates/fuel-core/src/lib.rs | 7 + crates/fuel-core/src/schema/balance.rs | 12 +- crates/fuel-core/src/schema/block.rs | 18 +- crates/fuel-core/src/schema/chain.rs | 12 +- crates/fuel-core/src/schema/coin.rs | 2 +- crates/fuel-core/src/schema/contract.rs | 2 +- crates/fuel-core/src/schema/dap.rs | 2 +- crates/fuel-core/src/schema/message.rs | 2 +- crates/fuel-core/src/schema/node_info.rs | 19 +- crates/fuel-core/src/schema/resource.rs | 12 +- crates/fuel-core/src/schema/tx.rs | 18 +- crates/fuel-core/src/schema/tx/types.rs | 10 +- crates/fuel-core/src/service.rs | 401 ++++++++------- crates/fuel-core/src/service/adapters.rs | 3 +- crates/fuel-core/src/service/genesis.rs | 478 +++++++++--------- crates/fuel-core/src/service/graph_api.rs | 159 ------ .../service/{modules.rs => sub_services.rs} | 107 ++-- .../consensus_module/poa/src/service.rs | 28 +- .../consensus_module/poa/src/service_test.rs | 2 +- crates/services/p2p/src/service.rs | 25 +- crates/services/relayer/src/lib.rs | 2 +- crates/services/relayer/src/service.rs | 25 +- crates/services/relayer/tests/integration.rs | 6 +- crates/services/src/lib.rs | 2 + crates/services/src/service.rs | 202 +++++--- crates/services/txpool/src/service.rs | 25 +- .../txpool/src/service/test_helpers.rs | 12 +- crates/services/txpool/src/service/tests.rs | 2 +- .../services/txpool/src/service/tests_p2p.rs | 15 +- tests/tests/metrics.rs | 3 +- tests/tests/relayer.rs | 5 +- 37 files changed, 1092 insertions(+), 797 deletions(-) create mode 100644 crates/fuel-core/src/graphql_api.rs create mode 100644 crates/fuel-core/src/graphql_api/service.rs delete mode 100644 crates/fuel-core/src/service/graph_api.rs rename crates/fuel-core/src/service/{modules.rs => sub_services.rs} (62%) diff --git a/Cargo.lock b/Cargo.lock index a47f79a0cc2..c86081de36c 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -153,9 +153,9 @@ checksum = "216261ddc8289130e551ddcd5ce8a064710c0d064a4d2895c67151c92b5443f6" [[package]] name = "arc-swap" -version = "1.5.1" +version = "1.6.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "983cd8b9d4b02a6dc6ffa557262eb5858a27a0038ffffe21a0f133eaa819a164" +checksum = "bddcadddf5e9015d310179a59bb28c4d4b9920ad0f11e8e14dbadf654890c9a6" [[package]] name = "arrayref" @@ -2216,6 +2216,7 @@ dependencies = [ "fuel-core-types", "futures", "hex", + "hyper", "itertools", "mockall", "num_cpus", @@ -3296,9 +3297,9 @@ dependencies = [ [[package]] name = "interceptor" -version = "0.8.1" +version = "0.8.2" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "4ffaa4d24f546a18eaeee91f7b2c52e080e20e285e43cd7c27a527b4712cfdad" +checksum = "1e8a11ae2da61704edada656798b61c94b35ecac2c58eb955156987d5e6be90b" dependencies = [ "async-trait", "bytes", @@ -5318,9 +5319,9 @@ dependencies = [ [[package]] name = "rtcp" -version = "0.7.1" +version = "0.7.2" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "c11171e7e37998dcf54a9e9d4a6e2e1932c994424c7d39bc6349fed1424c45c3" +checksum = "1919efd6d4a6a85d13388f9487549bb8e359f17198cc03ffd72f79b553873691" dependencies = [ "bytes", "thiserror", @@ -7037,9 +7038,9 @@ dependencies = [ [[package]] name = "webrtc-mdns" -version = "0.5.1" +version = "0.5.2" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "2548ae970afc2ae8e0b0dbfdacd9602d426d4f0ff6cda4602a45c0fd7ceaa82a" +checksum = "f08dfd7a6e3987e255c4dbe710dde5d94d0f0574f8a21afa95d171376c143106" dependencies = [ "log", "socket2", @@ -7454,9 +7455,9 @@ dependencies = [ [[package]] name = "yasna" -version = "0.5.0" +version = "0.5.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "346d34a236c9d3e5f3b9b74563f238f955bbd05fa0b8b4efa53c130c43982f4c" +checksum = "aed2e7a52e3744ab4d0c05c20aa065258e84c49fd4226f5191b2ed29712710b4" dependencies = [ "time", ] diff --git a/bin/fuel-core/src/cli/run.rs b/bin/fuel-core/src/cli/run.rs index f68e890bd77..afd7bc305be 100644 --- a/bin/fuel-core/src/cli/run.rs +++ b/bin/fuel-core/src/cli/run.rs @@ -15,6 +15,7 @@ use fuel_core::{ config::default_consensus_dev_key, Config, DbType, + ServiceTrait, VMConfig, }, txpool::Config as TxPoolConfig, @@ -208,7 +209,7 @@ pub async fn exec(command: Command) -> anyhow::Result<()> { // initialize the server let server = FuelService::new_node(config).await?; // pause the main task while service is running - server.run().await; + server.await_stop().await?; Ok(()) } diff --git a/crates/fuel-core/Cargo.toml b/crates/fuel-core/Cargo.toml index 28b957f3d07..cb8958efba3 100644 --- a/crates/fuel-core/Cargo.toml +++ b/crates/fuel-core/Cargo.toml @@ -36,6 +36,7 @@ fuel-core-types = { path = "../types", version = "0.15.1", features = [ ] } futures = "0.3" hex = { version = "0.4", features = ["serde"] } +hyper = { version = "0.14" } itertools = "0.10" num_cpus = "1.13" primitive-types = "0.12" diff --git a/crates/fuel-core/src/executor.rs b/crates/fuel-core/src/executor.rs index c0fafabe578..7a9bbad7cc3 100644 --- a/crates/fuel-core/src/executor.rs +++ b/crates/fuel-core/src/executor.rs @@ -112,7 +112,7 @@ use tracing::{ /// the transactions contained in the block and persist changes to the underlying database as needed. /// In production mode, block fields like transaction commitments are set based on the executed txs. /// In validation mode, the processed block commitments are compared with the proposed block. - +#[derive(Clone, Debug)] pub struct Executor { pub database: Database, pub config: Config, diff --git a/crates/fuel-core/src/graphql_api.rs b/crates/fuel-core/src/graphql_api.rs new file mode 100644 index 00000000000..8342ce71105 --- /dev/null +++ b/crates/fuel-core/src/graphql_api.rs @@ -0,0 +1,21 @@ +use fuel_core_types::{ + blockchain::primitives::SecretKeyWrapper, + fuel_tx::ConsensusParameters, + secrecy::Secret, +}; +use std::net::SocketAddr; + +pub mod service; + +#[derive(Clone, Debug)] +pub struct Config { + pub addr: SocketAddr, + pub utxo_validation: bool, + pub manual_blocks_enabled: bool, + pub vm_backtrace: bool, + pub min_gas_price: u64, + pub max_tx: usize, + pub max_depth: usize, + pub transaction_parameters: ConsensusParameters, + pub consensus_key: Option>, +} diff --git a/crates/fuel-core/src/graphql_api/service.rs b/crates/fuel-core/src/graphql_api/service.rs new file mode 100644 index 00000000000..0cc9ea8d42e --- /dev/null +++ b/crates/fuel-core/src/graphql_api/service.rs @@ -0,0 +1,223 @@ +use crate::{ + graphql_api::Config, + schema::{ + build_schema, + dap, + CoreSchema, + }, + service::metrics::metrics, +}; +use async_graphql::{ + extensions::Tracing, + http::{ + playground_source, + GraphQLPlaygroundConfig, + }, + Request, + Response, +}; +use axum::{ + extract::{ + DefaultBodyLimit, + Extension, + }, + http::{ + header::{ + ACCESS_CONTROL_ALLOW_HEADERS, + ACCESS_CONTROL_ALLOW_METHODS, + ACCESS_CONTROL_ALLOW_ORIGIN, + }, + HeaderValue, + }, + response::{ + sse::Event, + Html, + IntoResponse, + Sse, + }, + routing::{ + get, + post, + }, + Json, + Router, +}; +use fuel_core_services::{ + RunnableService, + RunnableTask, + StateWatcher, +}; +use futures::Stream; +use serde_json::json; +use std::{ + future::Future, + net::{ + SocketAddr, + TcpListener, + }, + pin::Pin, + sync::Arc, +}; +use tokio_stream::StreamExt; +use tower_http::{ + set_header::SetResponseHeaderLayer, + trace::TraceLayer, +}; + +pub type Service = fuel_core_services::ServiceRunner; + +// TODO: When the port of DB will exist we need to replace it with `Box +pub type Database = crate::database::Database; +// TODO: When the port for `Executor` will exist we need to replace it with `Box +pub type Executor = crate::service::adapters::ExecutorAdapter; +// TODO: When the port of BlockProducer will exist we need to replace it with +// `Box +pub type BlockProducer = Arc>; +// TODO: When the port of TxPool will exist we need to replace it with +// `Box. In the future GraphQL should not be aware of `TxPool`. It should +// use only `Database` to receive all information about +pub type TxPool = crate::service::sub_services::TxPoolService; + +#[derive(Clone)] +pub struct SharedState { + pub bound_address: SocketAddr, +} + +pub struct NotInitializedTask { + router: Router, + listener: TcpListener, + bound_address: SocketAddr, +} + +pub struct Task { + // Ugly workaround because of https://github.com/hyperium/hyper/issues/2582 + server: Pin> + Send + 'static>>, +} + +#[async_trait::async_trait] +impl RunnableService for NotInitializedTask { + const NAME: &'static str = "GraphQL"; + + type SharedData = SharedState; + type Task = Task; + + fn shared_data(&self) -> Self::SharedData { + SharedState { + bound_address: self.bound_address, + } + } + + async fn into_task(self, state: &StateWatcher) -> anyhow::Result { + let mut state = state.clone(); + let server = axum::Server::from_tcp(self.listener) + .unwrap() + .serve(self.router.into_make_service()) + .with_graceful_shutdown(async move { + loop { + state.changed().await.expect("The service is destroyed"); + + if !state.borrow().started() { + return + } + } + }); + + Ok(Task { + server: Box::pin(server), + }) + } +} + +#[async_trait::async_trait] +impl RunnableTask for Task { + async fn run(&mut self) -> anyhow::Result { + self.server.as_mut().await?; + // The `axum::Server` has its internal loop. If `await` is finished, we get an internal + // error or stop signal. + Ok(false /* should_continue */) + } +} + +pub fn new_service( + config: Config, + database: Database, + producer: BlockProducer, + txpool: TxPool, + executor: Executor, +) -> anyhow::Result { + let network_addr = config.addr; + let params = config.transaction_parameters; + let schema = build_schema() + .data(config) + .data(database) + .data(producer) + .data(txpool) + .data(executor); + let schema = dap::init(schema, params).extension(Tracing).finish(); + + let router = Router::new() + .route("/playground", get(graphql_playground)) + .route("/graphql", post(graphql_handler).options(ok)) + .route( + "/graphql-sub", + post(graphql_subscription_handler).options(ok), + ) + .route("/metrics", get(metrics)) + .route("/health", get(health)) + .layer(Extension(schema)) + .layer(TraceLayer::new_for_http()) + .layer(SetResponseHeaderLayer::<_>::overriding( + ACCESS_CONTROL_ALLOW_ORIGIN, + HeaderValue::from_static("*"), + )) + .layer(SetResponseHeaderLayer::<_>::overriding( + ACCESS_CONTROL_ALLOW_METHODS, + HeaderValue::from_static("*"), + )) + .layer(SetResponseHeaderLayer::<_>::overriding( + ACCESS_CONTROL_ALLOW_HEADERS, + HeaderValue::from_static("*"), + )) + .layer(DefaultBodyLimit::disable()); + + let listener = TcpListener::bind(network_addr)?; + let bound_address = listener.local_addr()?; + + tracing::info!("Binding GraphQL provider to {}", bound_address); + + Ok(Service::new(NotInitializedTask { + router, + listener, + bound_address, + })) +} + +async fn graphql_playground() -> impl IntoResponse { + Html(playground_source(GraphQLPlaygroundConfig::new("/graphql"))) +} + +async fn health() -> Json { + Json(json!({ "up": true })) +} + +async fn graphql_handler( + schema: Extension, + req: Json, +) -> Json { + schema.execute(req.0).await.into() +} + +async fn graphql_subscription_handler( + schema: Extension, + req: Json, +) -> Sse>> { + let stream = schema + .execute_stream(req.0) + .map(|r| Ok(Event::default().json_data(r).unwrap())); + Sse::new(stream) + .keep_alive(axum::response::sse::KeepAlive::new().text("keep-alive-text")) +} + +async fn ok() -> anyhow::Result<(), ()> { + Ok(()) +} diff --git a/crates/fuel-core/src/lib.rs b/crates/fuel-core/src/lib.rs index 5518e8b6662..76fa222c5b9 100644 --- a/crates/fuel-core/src/lib.rs +++ b/crates/fuel-core/src/lib.rs @@ -23,3 +23,10 @@ pub mod resource_query; pub mod schema; pub mod service; pub mod state; + +// In the future this module will be a separate crate for `fuel-core-graphql-api`. +mod graphql_api; + +pub mod fuel_core_graphql_api { + pub use crate::graphql_api::*; +} diff --git a/crates/fuel-core/src/schema/balance.rs b/crates/fuel-core/src/schema/balance.rs index f9946f9749e..5fe6c6a8adf 100644 --- a/crates/fuel-core/src/schema/balance.rs +++ b/crates/fuel-core/src/schema/balance.rs @@ -1,12 +1,10 @@ use crate::{ - database::{ - resource::{ - AssetQuery, - AssetSpendTarget, - AssetsQuery, - }, - Database, + database::resource::{ + AssetQuery, + AssetSpendTarget, + AssetsQuery, }, + fuel_core_graphql_api::service::Database, schema::scalars::{ Address, AssetId, diff --git a/crates/fuel-core/src/schema/block.rs b/crates/fuel-core/src/schema/block.rs index ecef5b7eb02..f8586ef5664 100644 --- a/crates/fuel-core/src/schema/block.rs +++ b/crates/fuel-core/src/schema/block.rs @@ -3,8 +3,13 @@ use super::scalars::{ Tai64Timestamp, }; use crate::{ - database::Database, - executor::Executor, + fuel_core_graphql_api::{ + service::{ + Database, + Executor, + }, + Config as GraphQLConfig, + }, schema::{ scalars::{ BlockId, @@ -13,7 +18,6 @@ use crate::{ }, tx::types::Transaction, }, - service::Config, state::IterDirection, }; use anyhow::anyhow; @@ -332,7 +336,8 @@ impl BlockMutation { time: Option, ) -> async_graphql::Result { let db = ctx.data_unchecked::(); - let config = ctx.data_unchecked::().clone(); + let executor = ctx.data_unchecked::().clone(); + let config = ctx.data_unchecked::().clone(); if !config.manual_blocks_enabled { return Err( @@ -341,11 +346,6 @@ impl BlockMutation { } // todo!("trigger block production manually"); - let executor = Executor { - database: db.clone(), - config: config.clone(), - }; - let block_time = get_time_closure(db, time, blocks_to_produce.0)?; for idx in 0..blocks_to_produce.0 { diff --git a/crates/fuel-core/src/schema/chain.rs b/crates/fuel-core/src/schema/chain.rs index 0d79c54b26d..67a478204ed 100644 --- a/crates/fuel-core/src/schema/chain.rs +++ b/crates/fuel-core/src/schema/chain.rs @@ -1,10 +1,12 @@ use crate::{ - database::Database, + fuel_core_graphql_api::{ + service::Database, + Config as GraphQLConfig, + }, schema::{ block::Block, scalars::U64, }, - service::Config, }; use async_graphql::{ Context, @@ -111,11 +113,9 @@ impl ChainInfo { &self, ctx: &Context<'_>, ) -> async_graphql::Result { - let config = ctx.data_unchecked::(); + let config = ctx.data_unchecked::(); - Ok(ConsensusParameters( - config.chain_conf.transaction_parameters, - )) + Ok(ConsensusParameters(config.transaction_parameters)) } } diff --git a/crates/fuel-core/src/schema/coin.rs b/crates/fuel-core/src/schema/coin.rs index 2c7fd16a018..898ca611bc5 100644 --- a/crates/fuel-core/src/schema/coin.rs +++ b/crates/fuel-core/src/schema/coin.rs @@ -1,5 +1,5 @@ use crate::{ - database::Database, + fuel_core_graphql_api::service::Database, schema::scalars::{ Address, AssetId, diff --git a/crates/fuel-core/src/schema/contract.rs b/crates/fuel-core/src/schema/contract.rs index 7df952ffbac..b76f6066f56 100644 --- a/crates/fuel-core/src/schema/contract.rs +++ b/crates/fuel-core/src/schema/contract.rs @@ -1,5 +1,5 @@ use crate::{ - database::Database, + fuel_core_graphql_api::service::Database, schema::scalars::{ AssetId, ContractId, diff --git a/crates/fuel-core/src/schema/dap.rs b/crates/fuel-core/src/schema/dap.rs index 9f17df3304f..8f10369c66a 100644 --- a/crates/fuel-core/src/schema/dap.rs +++ b/crates/fuel-core/src/schema/dap.rs @@ -2,8 +2,8 @@ use crate::{ database::{ transactional::DatabaseTransaction, vm_database::VmDatabase, - Database, }, + fuel_core_graphql_api::service::Database, schema::scalars::U64, }; use async_graphql::{ diff --git a/crates/fuel-core/src/schema/message.rs b/crates/fuel-core/src/schema/message.rs index 8ab66d40c32..1b11b4e4428 100644 --- a/crates/fuel-core/src/schema/message.rs +++ b/crates/fuel-core/src/schema/message.rs @@ -10,7 +10,7 @@ use super::{ }, }; use crate::{ - database::Database, + fuel_core_graphql_api::service::Database, query::MessageProofData, }; use async_graphql::{ diff --git a/crates/fuel-core/src/schema/node_info.rs b/crates/fuel-core/src/schema/node_info.rs index 4c360cc71b3..defb1f846fb 100644 --- a/crates/fuel-core/src/schema/node_info.rs +++ b/crates/fuel-core/src/schema/node_info.rs @@ -1,5 +1,5 @@ use super::scalars::U64; -use crate::service::Config; +use crate::fuel_core_graphql_api::Config as GraphQLConfig; use async_graphql::{ Context, Object, @@ -47,21 +47,16 @@ pub struct NodeQuery {} #[Object] impl NodeQuery { async fn node_info(&self, ctx: &Context<'_>) -> async_graphql::Result { - let Config { - utxo_validation, - vm, - txpool, - .. - } = ctx.data_unchecked::(); + let config = ctx.data_unchecked::(); const VERSION: &str = env!("CARGO_PKG_VERSION"); Ok(NodeInfo { - utxo_validation: *utxo_validation, - vm_backtrace: vm.backtrace, - min_gas_price: txpool.min_gas_price.into(), - max_tx: (txpool.max_tx as u64).into(), - max_depth: (txpool.max_depth as u64).into(), + utxo_validation: config.utxo_validation, + vm_backtrace: config.vm_backtrace, + min_gas_price: config.min_gas_price.into(), + max_tx: (config.max_tx as u64).into(), + max_depth: (config.max_depth as u64).into(), node_version: VERSION.to_owned(), }) } diff --git a/crates/fuel-core/src/schema/resource.rs b/crates/fuel-core/src/schema/resource.rs index ebdbe525363..2532e34a162 100644 --- a/crates/fuel-core/src/schema/resource.rs +++ b/crates/fuel-core/src/schema/resource.rs @@ -1,7 +1,8 @@ use crate::{ - database::{ - resource::AssetSpendTarget, - Database, + database::resource::AssetSpendTarget, + fuel_core_graphql_api::{ + service::Database, + Config as GraphQLConfig, }, resource_query::{ random_improve, @@ -18,7 +19,6 @@ use crate::{ U64, }, }, - service::Config, }; use async_graphql::{ Context, @@ -80,7 +80,7 @@ impl ResourceQuery { #[graphql(desc = "The excluded resources from the selection.")] excluded_ids: Option, ) -> async_graphql::Result>> { - let config = ctx.data_unchecked::(); + let config = ctx.data_unchecked::(); let owner: fuel_tx::Address = owner.0; let query_per_asset = query_per_asset @@ -91,7 +91,7 @@ impl ResourceQuery { e.amount.0, e.max .map(|max| max.0) - .unwrap_or(config.chain_conf.transaction_parameters.max_inputs), + .unwrap_or(config.transaction_parameters.max_inputs), ) }) .collect_vec(); diff --git a/crates/fuel-core/src/schema/tx.rs b/crates/fuel-core/src/schema/tx.rs index eedcfb4a268..c34d75ebabf 100644 --- a/crates/fuel-core/src/schema/tx.rs +++ b/crates/fuel-core/src/schema/tx.rs @@ -1,7 +1,9 @@ use crate::{ - database::{ - transaction::OwnedTransactionIndexCursor, + database::transaction::OwnedTransactionIndexCursor, + fuel_core_graphql_api::service::{ + BlockProducer, Database, + TxPool, }, query::{ transaction_status_change, @@ -13,7 +15,6 @@ use crate::{ SortedTxCursor, TransactionId, }, - service::modules::TxPoolService, state::IterDirection, }; use async_graphql::{ @@ -76,7 +77,7 @@ impl TxQuery { ) -> async_graphql::Result> { let db = ctx.data_unchecked::(); let id = id.0; - let txpool = ctx.data_unchecked::(); + let txpool = ctx.data_unchecked::(); if let Some(transaction) = txpool.shared.find_one(id) { Ok(Some(Transaction(transaction.tx().clone().deref().into()))) @@ -220,8 +221,7 @@ impl TxMutation { // for read-only calls. utxo_validation: Option, ) -> async_graphql::Result> { - let block_producer = - ctx.data_unchecked::>>(); + let block_producer = ctx.data_unchecked::(); let mut tx = FuelTx::from_bytes(&tx.0)?; tx.precompute(); @@ -236,7 +236,7 @@ impl TxMutation { ctx: &Context<'_>, tx: HexString, ) -> async_graphql::Result { - let txpool = ctx.data_unchecked::(); + let txpool = ctx.data_unchecked::(); let mut tx = FuelTx::from_bytes(&tx.0)?; tx.precompute(); let _: Vec<_> = txpool @@ -254,7 +254,7 @@ impl TxMutation { pub struct TxStatusSubscription; struct StreamState { - txpool: TxPoolService, + txpool: TxPool, db: Database, } @@ -277,7 +277,7 @@ impl TxStatusSubscription { ctx: &Context<'_>, #[graphql(desc = "The ID of the transaction")] id: TransactionId, ) -> impl Stream> { - let txpool = ctx.data_unchecked::().clone(); + let txpool = ctx.data_unchecked::().clone(); let db = ctx.data_unchecked::().clone(); let rx = BroadcastStream::new(txpool.shared.tx_update_subscribe()); let state = Box::new(StreamState { txpool, db }); diff --git a/crates/fuel-core/src/schema/tx/types.rs b/crates/fuel-core/src/schema/tx/types.rs index a9da03a6cdd..dd7ef7f95e0 100644 --- a/crates/fuel-core/src/schema/tx/types.rs +++ b/crates/fuel-core/src/schema/tx/types.rs @@ -4,7 +4,10 @@ use super::{ receipt::Receipt, }; use crate::{ - database::Database, + fuel_core_graphql_api::service::{ + Database, + TxPool, + }, schema::{ block::Block, contract::Contract, @@ -19,7 +22,6 @@ use crate::{ U64, }, }, - service::modules::TxPoolService, }; use async_graphql::{ Context, @@ -405,7 +407,7 @@ impl Transaction { ) -> async_graphql::Result> { let id = self.0.id(); let db = ctx.data_unchecked::(); - let txpool = ctx.data_unchecked::(); + let txpool = ctx.data_unchecked::(); get_tx_status(id, db, txpool).await } @@ -498,7 +500,7 @@ impl Transaction { pub(super) async fn get_tx_status( id: fuel_core_types::fuel_types::Bytes32, db: &Database, - txpool: &TxPoolService, + txpool: &TxPool, ) -> async_graphql::Result> { match db.get_tx_status(&id)? { Some(status) => Ok(Some(status.into())), diff --git a/crates/fuel-core/src/service.rs b/crates/fuel-core/src/service.rs index 6fec26cb5a7..46856ee5704 100644 --- a/crates/fuel-core/src/service.rs +++ b/crates/fuel-core/src/service.rs @@ -1,15 +1,18 @@ -use crate::database::Database; -use anyhow::Error as AnyError; -use futures::FutureExt; -use modules::Modules; +use crate::{ + database::Database, + service::adapters::P2PAdapter, +}; +use fuel_core_services::{ + RunnableService, + RunnableTask, + ServiceRunner, + State, + StateWatcher, +}; use std::{ net::SocketAddr, panic, }; -use tokio::{ - sync::oneshot, - task::JoinHandle, -}; use tracing::log::warn; pub use config::{ @@ -17,40 +20,59 @@ pub use config::{ DbType, VMConfig, }; +pub use fuel_core_services::Service as ServiceTrait; pub mod adapters; pub mod config; pub(crate) mod genesis; -pub mod graph_api; pub mod metrics; -pub mod modules; +pub mod sub_services; -pub struct FuelService { - handle: JoinHandle<()>, - /// Shutdown the fuel service. - shutdown: oneshot::Sender<()>, +#[derive(Clone)] +pub struct SharedState { + /// The transaction pool shared state. + pub txpool: fuel_core_txpool::service::SharedState, + /// The P2P network shared state. + #[cfg(feature = "p2p")] + pub network: fuel_core_p2p::service::SharedState, #[cfg(feature = "relayer")] - /// Relayer handle - relayer_handle: Option, - /// The address bound by the system for serving the API - pub bound_address: SocketAddr, + /// The Relayer shared state. + pub relayer: Option, + /// The GraphQL shared state. + pub graph_ql: crate::fuel_core_graphql_api::service::SharedState, } -struct FuelServiceInner { - tasks: Vec>>, - /// handler for all modules. - modules: Modules, +pub struct FuelService { + /// The `ServiceRunner` used for `FuelService`. + /// + /// # Dev-note: The `FuelService` is already exposed as a public API and used by many crates. + /// To provide a user-friendly API and avoid breaking many downstream crates, `ServiceRunner` + /// is wrapped inside. + runner: ServiceRunner, + /// The shared state of the service + pub shared: SharedState, /// The address bound by the system for serving the API pub bound_address: SocketAddr, - /// Shutdown the graphql api - stop_graphql_api: oneshot::Sender<()>, } impl FuelService { - /// Create a fuel node instance from service config - #[tracing::instrument(skip(config))] - pub async fn new_node(mut config: Config) -> Result { + /// Creates a `FuelService` instance from service config + #[tracing::instrument(skip_all)] + pub fn new(database: Database, mut config: Config) -> anyhow::Result { Self::make_config_consistent(&mut config); + let task = Task::new(database, config)?; + let runner = ServiceRunner::new(task); + let shared = runner.shared.clone(); + let bound_address = runner.shared.graph_ql.bound_address; + Ok(FuelService { + bound_address, + shared, + runner, + }) + } + + /// Creates and starts fuel node instance from service config + pub async fn new_node(config: Config) -> anyhow::Result { // initialize database let database = match config.database_type { #[cfg(feature = "rocksdb")] @@ -59,44 +81,37 @@ impl FuelService { #[cfg(not(feature = "rocksdb"))] _ => Database::in_memory(), }; - // initialize service - Ok(Self::spawn_service( - Self::init_service(database, config).await?, - )) + + Self::from_database(database, config).await } - fn spawn_service(service: FuelServiceInner) -> Self { - let bound_address = service.bound_address; - let (shutdown, stop_rx) = oneshot::channel(); - - #[cfg(feature = "relayer")] - let relayer_handle = service - .modules - .relayer - .as_ref() - .map(|relayer| relayer.shared.clone()); - - let handle = tokio::spawn(async move { - let run_fut = service.run(); - let shutdown_fut = stop_rx.then(|stop| async move { - if stop.is_err() { - // If the handle is dropped we don't want - // this to ever shutdown the service. - futures::future::pending::<()>().await; - } - // Only a successful recv results in a shutdown. - }); - tokio::pin!(run_fut); - tokio::pin!(shutdown_fut); - futures::future::select(shutdown_fut, run_fut).await; - }); - Self { - handle, - shutdown, - bound_address, - #[cfg(feature = "relayer")] - relayer_handle, + /// Creates and starts fuel node instance from service config and a pre-existing database + pub async fn from_database( + database: Database, + config: Config, + ) -> anyhow::Result { + let service = Self::new(database, config)?; + service.runner.start_and_await().await?; + Ok(service) + } + + #[cfg(feature = "relayer")] + /// Wait for the [`Relayer`] to be in sync with + /// the data availability layer. + /// + /// Yields until the relayer reaches a point where it + /// considered up to date. Note that there's no guarantee + /// the relayer will ever catch up to the da layer and + /// may fall behind immediately after this future completes. + /// + /// The only guarantee is that if this future completes then + /// the relayer did reach consistency with the da layer for + /// some period of time. + pub async fn await_relayer_synced(&self) -> anyhow::Result<()> { + if let Some(relayer_handle) = &self.runner.shared.relayer { + relayer_handle.await_synced().await?; } + Ok(()) } // TODO: Rework our configs system to avoid nesting of the same configs. @@ -114,144 +129,131 @@ impl FuelService { config.block_producer.utxo_validation = config.utxo_validation; } } +} - /// Used to initialize a service with a pre-existing database - pub async fn from_database( - database: Database, - config: Config, - ) -> Result { - Ok(Self::spawn_service( - Self::init_service(database, config).await?, - )) +#[async_trait::async_trait] +impl ServiceTrait for FuelService { + fn start(&self) -> anyhow::Result<()> { + self.runner.start() } - /// Private inner method for initializing the fuel service - async fn init_service( - database: Database, - config: Config, - ) -> Result { - // initialize state - Self::initialize_state(&config, &database)?; - - // start modules - let modules = modules::start_modules(&config, &database).await?; - - let (stop_tx, stop_rx) = oneshot::channel(); - // start background tasks - let mut tasks = vec![]; - let (bound_address, api_server) = - graph_api::start_server(config.clone(), database, &modules, stop_rx).await?; - tasks.push(api_server); - // Socket is ignored for now, but as more services are added - // it may be helpful to have a way to list all services and their ports - - Ok(FuelServiceInner { - tasks, - bound_address, - modules, - stop_graphql_api: stop_tx, - }) + async fn start_and_await(&self) -> anyhow::Result { + self.runner.start_and_await().await } - /// Awaits for the completion of any server background tasks - pub async fn run(self) { - Self::wait_for_handle(self.handle).await; + async fn await_start_or_stop(&self) -> anyhow::Result { + self.runner.await_start_or_stop().await } - /// Shutdown background tasks - pub async fn stop(self) { - let Self { - handle, shutdown, .. - } = self; - let _ = shutdown.send(()); - Self::wait_for_handle(handle).await; + fn stop(&self) -> bool { + self.runner.stop() } - async fn wait_for_handle(handle: JoinHandle<()>) { - if let Err(err) = handle.await { - if err.is_panic() { - // Resume the panic on the main task - panic::resume_unwind(err.into_panic()); - } - } + async fn stop_and_await(&self) -> anyhow::Result { + self.runner.stop_and_await().await } - #[cfg(feature = "relayer")] - /// Wait for the [`Relayer`] to be in sync with - /// the data availability layer. - /// - /// Yields until the relayer reaches a point where it - /// considered up to date. Note that there's no guarantee - /// the relayer will ever catch up to the da layer and - /// may fall behind immediately after this future completes. - /// - /// The only guarantee is that if this future completes then - /// the relayer did reach consistency with the da layer for - /// some period of time. - pub async fn await_relayer_synced(&self) -> anyhow::Result<()> { - if let Some(relayer_handle) = &self.relayer_handle { - relayer_handle.await_synced().await?; - } - Ok(()) + async fn await_stop(&self) -> anyhow::Result { + self.runner.await_stop().await + } + + fn state(&self) -> State { + self.runner.state() + } +} + +pub type SubServices = Vec>; + +pub struct Task { + /// The list of started sub services. + services: SubServices, + /// The address bound by the system for serving the API + pub shared: SharedState, +} + +impl Task { + /// Private inner method for initializing the fuel service task + pub fn new(database: Database, config: Config) -> anyhow::Result { + // initialize state + genesis::initialize_state(&config, &database)?; + + // initialize sub services + let (services, shared) = sub_services::init_sub_services(&config, &database)?; + Ok(Task { services, shared }) + } + + #[cfg(test)] + pub fn sub_services(&mut self) -> &mut SubServices { + &mut self.services } } -impl FuelServiceInner { - /// Awaits for the completion of any server background tasks - pub async fn run(self) { - let Self { - tasks, - modules, - stop_graphql_api, - .. - } = self; - let run_fut = Self::run_inner(tasks); - let shutdown_fut = shutdown_signal(stop_graphql_api); - tokio::pin!(run_fut); - tokio::pin!(shutdown_fut); - futures::future::select(shutdown_fut, run_fut).await; - modules.stop().await; +#[async_trait::async_trait] +impl RunnableService for Task { + const NAME: &'static str = "FuelService"; + type SharedData = SharedState; + type Task = Task; + + fn shared_data(&self) -> Self::SharedData { + self.shared.clone() } - /// Awaits for the completion of any server background tasks - async fn run_inner(tasks: Vec>>) { - for task in tasks { - match task.await { - Err(err) => { - if err.is_panic() { - // Resume the panic on the main task - panic::resume_unwind(err.into_panic()); - } - } - Ok(Err(e)) => { - eprintln!("server error: {:?}", e); - } - Ok(Ok(_)) => {} + async fn into_task(self, _: &StateWatcher) -> anyhow::Result { + for service in &self.services { + service.start_and_await().await?; + } + Ok(self) + } +} + +#[async_trait::async_trait] +impl RunnableTask for Task { + async fn run(&mut self) -> anyhow::Result { + let mut stop_signals = vec![]; + for service in &self.services { + stop_signals.push(service.await_stop()) + } + stop_signals.push(Box::pin(shutdown_signal())); + + let (result, _, _) = futures::future::select_all(stop_signals).await; + + if let Err(err) = result { + tracing::error!("Got an error during listen for shutdown: {}", err); + } + + // We received the stop signal from any of one source, so stop this service and + // all sub-services. + for service in &self.services { + let result = service.stop_and_await().await; + + if let Err(err) = result { + tracing::error!( + "Got and error during awaiting for stop of the service: {}", + err + ); } } + + Ok(false /* should_continue */) } } -async fn shutdown_signal(stop_graphql_api: oneshot::Sender<()>) { +async fn shutdown_signal() -> anyhow::Result { #[cfg(unix)] { let mut sigterm = - tokio::signal::unix::signal(tokio::signal::unix::SignalKind::terminate()) - .expect("failed to install sigterm handler"); + tokio::signal::unix::signal(tokio::signal::unix::SignalKind::terminate())?; let mut sigint = - tokio::signal::unix::signal(tokio::signal::unix::SignalKind::interrupt()) - .expect("failed to install sigint handler"); + tokio::signal::unix::signal(tokio::signal::unix::SignalKind::interrupt())?; loop { tokio::select! { _ = sigterm.recv() => { tracing::info!("sigterm received"); - let _ = stop_graphql_api.send(()); break; } _ = sigint.recv() => { tracing::log::info!("sigint received"); - let _ = stop_graphql_api.send(()); break; } } @@ -259,10 +261,69 @@ async fn shutdown_signal(stop_graphql_api: oneshot::Sender<()>) { } #[cfg(not(unix))] { - tokio::signal::ctrl_c() - .await - .expect("failed to install CTRL+C signal handler"); - let _ = stop_graphql_api.send(()); + tokio::signal::ctrl_c().await?; tracing::log::info!("CTRL+C received"); } + Ok(State::Stopped) +} + +#[cfg(test)] +mod tests { + use crate::service::{ + Config, + Task, + }; + use fuel_core_services::{ + RunnableService, + RunnableTask, + State, + }; + use std::{ + thread::sleep, + time::Duration, + }; + + #[tokio::test] + async fn run_start_and_stop() { + // The test verify that if we stop any of sub-services + let mut i = 0; + loop { + let task = Task::new(Default::default(), Config::local_node()).unwrap(); + let (_, receiver) = tokio::sync::watch::channel(State::NotStarted); + let mut task = task.into_task(&receiver).await.unwrap(); + sleep(Duration::from_secs(1)); + for service in task.sub_services() { + assert_eq!(service.state(), State::Started); + } + + if i < task.sub_services().len() { + task.sub_services()[i].stop_and_await().await.unwrap(); + assert!(!task.run().await.unwrap()); + + for service in task.sub_services() { + // Check that the state is `Stopped`(not `StoppedWithError`) + assert_eq!(service.state(), State::Stopped); + } + } else { + break + } + i += 1; + } + + #[allow(unused_mut)] + let mut expected_services = 3; + + // Relayer service is disabled with `Config::local_node`. + // #[cfg(feature = "relayer")] + // { + // expected_services += 1; + // } + #[cfg(feature = "p2p")] + { + expected_services += 1; + } + + // # Dev-note: Update the `expected_services` when we add/remove a new/old service. + assert_eq!(i, expected_services); + } } diff --git a/crates/fuel-core/src/service/adapters.rs b/crates/fuel-core/src/service/adapters.rs index 4017bc0c48e..a242a37e5da 100644 --- a/crates/fuel-core/src/service/adapters.rs +++ b/crates/fuel-core/src/service/adapters.rs @@ -30,6 +30,7 @@ impl TxPoolAdapter { } } +#[derive(Clone)] pub struct ExecutorAdapter { pub database: Database, pub config: Config, @@ -38,7 +39,7 @@ pub struct ExecutorAdapter { pub struct MaybeRelayerAdapter { pub database: Database, #[cfg(feature = "relayer")] - pub relayer_synced: Option, + pub relayer_synced: Option, } pub struct BlockProducerAdapter { diff --git a/crates/fuel-core/src/service/genesis.rs b/crates/fuel-core/src/service/genesis.rs index b9ead017d23..b633ecb6a6c 100644 --- a/crates/fuel-core/src/service/genesis.rs +++ b/crates/fuel-core/src/service/genesis.rs @@ -1,9 +1,6 @@ use crate::{ database::Database, - service::{ - config::Config, - FuelService, - }, + service::config::Config, }; use anyhow::anyhow; use fuel_core_chain_config::{ @@ -63,287 +60,282 @@ use fuel_core_types::{ }; use itertools::Itertools; -impl FuelService { - /// Loads state from the chain config into database - pub(crate) fn initialize_state( - config: &Config, - database: &Database, - ) -> anyhow::Result<()> { - // check if chain is initialized - if database.get_chain_name()?.is_none() { - // start a db transaction for bulk-writing - let mut import_tx = database.transaction(); - let database = import_tx.as_mut(); - - Self::add_genesis_block(config, database)?; - - // Write transaction to db - import_tx.commit()?; - } - - Ok(()) +/// Loads state from the chain config into database +pub(crate) fn initialize_state( + config: &Config, + database: &Database, +) -> anyhow::Result<()> { + // check if chain is initialized + if database.get_chain_name()?.is_none() { + // start a db transaction for bulk-writing + let mut import_tx = database.transaction(); + let database = import_tx.as_mut(); + + add_genesis_block(config, database)?; + + // Write transaction to db + import_tx.commit()?; } - pub fn add_genesis_block( - config: &Config, - database: &mut Database, - ) -> anyhow::Result<()> { - // Initialize the chain id and height. - database.init(&config.chain_conf)?; - - let chain_config_hash = config.chain_conf.clone().root()?.into(); - let coins_root = - Self::init_coin_state(database, &config.chain_conf.initial_state)?.into(); - let contracts_root = - Self::init_contracts(database, &config.chain_conf.initial_state)?.into(); - let (messages_root, message_ids) = - Self::init_da_messages(database, &config.chain_conf.initial_state)?; - let messages_root = messages_root.into(); - - let genesis = Genesis { - chain_config_hash, - coins_root, - contracts_root, - messages_root, - }; + Ok(()) +} - let block = Block::new( - PartialBlockHeader { - application: ApplicationHeader:: { - da_height: Default::default(), - generated: Empty, - }, - consensus: ConsensusHeader:: { - // The genesis is a first block, so previous root is zero. - prev_root: Bytes32::zeroed(), - // The initial height is defined by the `ChainConfig`. - // If it is `None` then it will be zero. - height: config - .chain_conf - .initial_state - .as_ref() - .map(|config| config.height.unwrap_or_else(|| 0u32.into())) - .unwrap_or_else(|| 0u32.into()), - time: fuel_core_types::tai64::Tai64::UNIX_EPOCH, - generated: Empty, - }, - metadata: None, +fn add_genesis_block(config: &Config, database: &mut Database) -> anyhow::Result<()> { + // Initialize the chain id and height. + database.init(&config.chain_conf)?; + + let chain_config_hash = config.chain_conf.clone().root()?.into(); + let coins_root = init_coin_state(database, &config.chain_conf.initial_state)?.into(); + let contracts_root = + init_contracts(database, &config.chain_conf.initial_state)?.into(); + let (messages_root, message_ids) = + init_da_messages(database, &config.chain_conf.initial_state)?; + let messages_root = messages_root.into(); + + let genesis = Genesis { + chain_config_hash, + coins_root, + contracts_root, + messages_root, + }; + + let block = Block::new( + PartialBlockHeader { + application: ApplicationHeader:: { + da_height: Default::default(), + generated: Empty, + }, + consensus: ConsensusHeader:: { + // The genesis is a first block, so previous root is zero. + prev_root: Bytes32::zeroed(), + // The initial height is defined by the `ChainConfig`. + // If it is `None` then it will be zero. + height: config + .chain_conf + .initial_state + .as_ref() + .map(|config| config.height.unwrap_or_else(|| 0u32.into())) + .unwrap_or_else(|| 0u32.into()), + time: fuel_core_types::tai64::Tai64::UNIX_EPOCH, + generated: Empty, }, - // Genesis block doesn't have any transaction. - vec![], - &message_ids, - ); - - let seal = Consensus::Genesis(genesis); - let block_id = block.id(); - database - .storage::() - .insert(&block_id.into(), &block.compress())?; - database.seal_block(block_id, seal) + metadata: None, + }, + // Genesis block doesn't have any transaction. + vec![], + &message_ids, + ); + + let seal = Consensus::Genesis(genesis); + let block_id = block.id(); + database + .storage::() + .insert(&block_id.into(), &block.compress())?; + database.seal_block(block_id, seal) +} + +fn init_coin_state( + db: &mut Database, + state: &Option, +) -> anyhow::Result { + let mut coins_tree = binary::in_memory::MerkleTree::new(); + // TODO: Store merkle sum tree root over coins with unspecified utxo ids. + let mut generated_output_index: u64 = 0; + if let Some(state) = &state { + if let Some(coins) = &state.coins { + for coin in coins { + let utxo_id = UtxoId::new( + // generated transaction id([0..[out_index/255]]) + coin.tx_id.unwrap_or_else(|| { + Bytes32::try_from( + (0..(Bytes32::LEN - WORD_SIZE)) + .map(|_| 0u8) + .chain( + (generated_output_index / 255) + .to_be_bytes() + .into_iter(), + ) + .collect_vec() + .as_slice(), + ) + .expect("Incorrect genesis transaction id byte length") + }), + coin.output_index.map(|i| i as u8).unwrap_or_else(|| { + generated_output_index += 1; + (generated_output_index % 255) as u8 + }), + ); + + let mut coin = Coin { + owner: coin.owner, + amount: coin.amount, + asset_id: coin.asset_id, + maturity: coin.maturity.unwrap_or_default(), + status: CoinStatus::Unspent, + block_created: coin.block_created.unwrap_or_default(), + }; + + if db.storage::().insert(&utxo_id, &coin)?.is_some() { + return Err(anyhow!("Coin should not exist")) + } + coins_tree.push(coin.root()?.as_slice()) + } + } } + Ok(coins_tree.root()) +} + +fn init_contracts( + db: &mut Database, + state: &Option, +) -> anyhow::Result { + let mut contracts_tree = binary::in_memory::MerkleTree::new(); + // initialize contract state + if let Some(state) = &state { + if let Some(contracts) = &state.contracts { + for (generated_output_index, contract_config) in contracts.iter().enumerate() + { + let contract = Contract::from(contract_config.code.as_slice()); + let salt = contract_config.salt; + let root = contract.root(); + let contract_id = + contract.id(&salt, &root, &Contract::default_state_root()); + // insert contract code + if db + .storage::() + .insert(&contract_id, contract.as_ref())? + .is_some() + { + return Err(anyhow!("Contract code should not exist")) + } - /// initialize coins - pub fn init_coin_state( - db: &mut Database, - state: &Option, - ) -> anyhow::Result { - let mut coins_tree = binary::in_memory::MerkleTree::new(); - // TODO: Store merkle sum tree root over coins with unspecified utxo ids. - let mut generated_output_index: u64 = 0; - if let Some(state) = &state { - if let Some(coins) = &state.coins { - for coin in coins { - let utxo_id = UtxoId::new( - // generated transaction id([0..[out_index/255]]) - coin.tx_id.unwrap_or_else(|| { + // insert contract root + if db + .storage::() + .insert(&contract_id, &(salt, root))? + .is_some() + { + return Err(anyhow!("Contract info should not exist")) + } + if db + .storage::() + .insert( + &contract_id, + &UtxoId::new( + // generated transaction id([0..[out_index/255]]) Bytes32::try_from( (0..(Bytes32::LEN - WORD_SIZE)) .map(|_| 0u8) .chain( - (generated_output_index / 255) + (generated_output_index as u64 / 255) .to_be_bytes() .into_iter(), ) .collect_vec() .as_slice(), ) - .expect("Incorrect genesis transaction id byte length") - }), - coin.output_index.map(|i| i as u8).unwrap_or_else(|| { - generated_output_index += 1; - (generated_output_index % 255) as u8 - }), - ); - - let mut coin = Coin { - owner: coin.owner, - amount: coin.amount, - asset_id: coin.asset_id, - maturity: coin.maturity.unwrap_or_default(), - status: CoinStatus::Unspent, - block_created: coin.block_created.unwrap_or_default(), - }; - - if db.storage::().insert(&utxo_id, &coin)?.is_some() { - return Err(anyhow!("Coin should not exist")) - } - coins_tree.push(coin.root()?.as_slice()) + .expect("Incorrect genesis transaction id byte length"), + generated_output_index as u8, + ), + )? + .is_some() + { + return Err(anyhow!("Contract utxo should not exist")) } + init_contract_state(db, &contract_id, contract_config)?; + init_contract_balance(db, &contract_id, contract_config)?; + contracts_tree + .push(ContractRef::new(&mut *db, contract_id).root()?.as_slice()); } } - Ok(coins_tree.root()) } + Ok(contracts_tree.root()) +} - fn init_contracts( - db: &mut Database, - state: &Option, - ) -> anyhow::Result { - let mut contracts_tree = binary::in_memory::MerkleTree::new(); - // initialize contract state - if let Some(state) = &state { - if let Some(contracts) = &state.contracts { - for (generated_output_index, contract_config) in - contracts.iter().enumerate() - { - let contract = Contract::from(contract_config.code.as_slice()); - let salt = contract_config.salt; - let root = contract.root(); - let contract_id = - contract.id(&salt, &root, &Contract::default_state_root()); - // insert contract code - if db - .storage::() - .insert(&contract_id, contract.as_ref())? - .is_some() - { - return Err(anyhow!("Contract code should not exist")) - } - - // insert contract root - if db - .storage::() - .insert(&contract_id, &(salt, root))? - .is_some() - { - return Err(anyhow!("Contract info should not exist")) - } - if db - .storage::() - .insert( - &contract_id, - &UtxoId::new( - // generated transaction id([0..[out_index/255]]) - Bytes32::try_from( - (0..(Bytes32::LEN - WORD_SIZE)) - .map(|_| 0u8) - .chain( - (generated_output_index as u64 / 255) - .to_be_bytes() - .into_iter(), - ) - .collect_vec() - .as_slice(), - ) - .expect("Incorrect genesis transaction id byte length"), - generated_output_index as u8, - ), - )? - .is_some() - { - return Err(anyhow!("Contract utxo should not exist")) - } - Self::init_contract_state(db, &contract_id, contract_config)?; - Self::init_contract_balance(db, &contract_id, contract_config)?; - contracts_tree - .push(ContractRef::new(&mut *db, contract_id).root()?.as_slice()); - } +fn init_contract_state( + db: &mut Database, + contract_id: &ContractId, + contract: &ContractConfig, +) -> anyhow::Result<()> { + // insert state related to contract + if let Some(contract_state) = &contract.state { + for (key, value) in contract_state { + if db + .storage::() + .insert(&(contract_id, key), value)? + .is_some() + { + return Err(anyhow!("Contract state should not exist")) } } - Ok(contracts_tree.root()) } + Ok(()) +} - fn init_contract_state( - db: &mut Database, - contract_id: &ContractId, - contract: &ContractConfig, - ) -> anyhow::Result<()> { - // insert state related to contract - if let Some(contract_state) = &contract.state { - for (key, value) in contract_state { +fn init_da_messages( + db: &mut Database, + state: &Option, +) -> anyhow::Result<(MerkleRoot, Vec)> { + let mut message_tree = binary::in_memory::MerkleTree::new(); + let mut message_ids = vec![]; + if let Some(state) = &state { + if let Some(message_state) = &state.messages { + for msg in message_state { + let mut message = Message { + sender: msg.sender, + recipient: msg.recipient, + nonce: msg.nonce, + amount: msg.amount, + data: msg.data.clone(), + da_height: msg.da_height, + fuel_block_spend: None, + }; + + let message_id = message.id(); if db - .storage::() - .insert(&(contract_id, key), value)? + .storage::() + .insert(&message_id, &message)? .is_some() { - return Err(anyhow!("Contract state should not exist")) + return Err(anyhow!("Message should not exist")) } + message_tree.push(message.root()?.as_slice()); + message_ids.push(message_id); } } - Ok(()) } - fn init_da_messages( - db: &mut Database, - state: &Option, - ) -> anyhow::Result<(MerkleRoot, Vec)> { - let mut message_tree = binary::in_memory::MerkleTree::new(); - let mut message_ids = vec![]; - if let Some(state) = &state { - if let Some(message_state) = &state.messages { - for msg in message_state { - let mut message = Message { - sender: msg.sender, - recipient: msg.recipient, - nonce: msg.nonce, - amount: msg.amount, - data: msg.data.clone(), - da_height: msg.da_height, - fuel_block_spend: None, - }; - - let message_id = message.id(); - if db - .storage::() - .insert(&message_id, &message)? - .is_some() - { - return Err(anyhow!("Message should not exist")) - } - message_tree.push(message.root()?.as_slice()); - message_ids.push(message_id); - } - } - } - - Ok((message_tree.root(), message_ids)) - } + Ok((message_tree.root(), message_ids)) +} - fn init_contract_balance( - db: &mut Database, - contract_id: &ContractId, - contract: &ContractConfig, - ) -> anyhow::Result<()> { - // insert balances related to contract - if let Some(balances) = &contract.balances { - for (key, value) in balances { - if db - .storage::() - .insert(&(contract_id, key), value)? - .is_some() - { - return Err(anyhow!("Contract balance should not exist")) - } +fn init_contract_balance( + db: &mut Database, + contract_id: &ContractId, + contract: &ContractConfig, +) -> anyhow::Result<()> { + // insert balances related to contract + if let Some(balances) = &contract.balances { + for (key, value) in balances { + if db + .storage::() + .insert(&(contract_id, key), value)? + .is_some() + { + return Err(anyhow!("Contract balance should not exist")) } } - Ok(()) } + Ok(()) } #[cfg(test)] mod tests { use super::*; - use crate::service::config::Config; + use crate::service::{ + config::Config, + FuelService, + }; use fuel_core_chain_config::{ ChainConfig, CoinConfig, @@ -583,7 +575,7 @@ mod tests { let db = &Database::default(); - FuelService::initialize_state(&config, db).unwrap(); + initialize_state(&config, db).unwrap(); let expected_msg: Message = msg.into(); diff --git a/crates/fuel-core/src/service/graph_api.rs b/crates/fuel-core/src/service/graph_api.rs deleted file mode 100644 index cdfdcca760a..00000000000 --- a/crates/fuel-core/src/service/graph_api.rs +++ /dev/null @@ -1,159 +0,0 @@ -use super::modules::Modules; -use crate::{ - database::Database, - schema::{ - build_schema, - dap, - CoreSchema, - }, - service::{ - metrics::metrics, - Config, - }, -}; -use async_graphql::{ - extensions::Tracing, - http::{ - playground_source, - GraphQLPlaygroundConfig, - }, - Request, - Response, -}; -use axum::{ - extract::{ - DefaultBodyLimit, - Extension, - }, - http::{ - header::{ - ACCESS_CONTROL_ALLOW_HEADERS, - ACCESS_CONTROL_ALLOW_METHODS, - ACCESS_CONTROL_ALLOW_ORIGIN, - }, - HeaderValue, - }, - response::{ - sse::Event, - Html, - IntoResponse, - Sse, - }, - routing::{ - get, - post, - }, - Json, - Router, -}; -use futures::Stream; -use serde_json::json; -use std::net::{ - SocketAddr, - TcpListener, -}; -use tokio::{ - sync::oneshot, - task::JoinHandle, -}; -use tokio_stream::StreamExt; -use tower_http::{ - set_header::SetResponseHeaderLayer, - trace::TraceLayer, -}; -use tracing::info; - -/// Spawns the api server for this node -pub async fn start_server( - config: Config, - db: Database, - modules: &Modules, - stop: oneshot::Receiver<()>, -) -> anyhow::Result<(SocketAddr, JoinHandle>)> { - let network_addr = config.addr; - let params = config.chain_conf.transaction_parameters; - let schema = build_schema() - .data(config) - .data(db) - .data(modules.txpool.clone()) - .data(modules.block_producer.clone()) - .data(modules.consensus_module.clone()); - let schema = dap::init(schema, params).extension(Tracing).finish(); - - let router = Router::new() - .route("/playground", get(graphql_playground)) - .route("/graphql", post(graphql_handler).options(ok)) - .route( - "/graphql-sub", - post(graphql_subscription_handler).options(ok), - ) - .route("/metrics", get(metrics)) - .route("/health", get(health)) - .layer(Extension(schema)) - .layer(TraceLayer::new_for_http()) - .layer(SetResponseHeaderLayer::<_>::overriding( - ACCESS_CONTROL_ALLOW_ORIGIN, - HeaderValue::from_static("*"), - )) - .layer(SetResponseHeaderLayer::<_>::overriding( - ACCESS_CONTROL_ALLOW_METHODS, - HeaderValue::from_static("*"), - )) - .layer(SetResponseHeaderLayer::<_>::overriding( - ACCESS_CONTROL_ALLOW_HEADERS, - HeaderValue::from_static("*"), - )) - .layer(DefaultBodyLimit::disable()); - - let (tx, rx) = tokio::sync::oneshot::channel(); - let listener = TcpListener::bind(network_addr)?; - let bound_addr = listener.local_addr().unwrap(); - - info!("Binding GraphQL provider to {}", bound_addr); - let handle = tokio::spawn(async move { - let server = axum::Server::from_tcp(listener) - .unwrap() - .serve(router.into_make_service()) - .with_graceful_shutdown(async move { - let _ = stop.await; - }); - - tx.send(()).unwrap(); - server.await.map_err(Into::into) - }); - - // wait until the server is ready - rx.await.unwrap(); - - Ok((bound_addr, handle)) -} - -async fn graphql_playground() -> impl IntoResponse { - Html(playground_source(GraphQLPlaygroundConfig::new("/graphql"))) -} - -async fn health() -> Json { - Json(json!({ "up": true })) -} - -async fn graphql_handler( - schema: Extension, - req: Json, -) -> Json { - schema.execute(req.0).await.into() -} - -async fn graphql_subscription_handler( - schema: Extension, - req: Json, -) -> Sse>> { - let stream = schema - .execute_stream(req.0) - .map(|r| Ok(Event::default().json_data(r).unwrap())); - Sse::new(stream) - .keep_alive(axum::response::sse::KeepAlive::new().text("keep-alive-text")) -} - -async fn ok() -> anyhow::Result<(), ()> { - Ok(()) -} diff --git a/crates/fuel-core/src/service/modules.rs b/crates/fuel-core/src/service/sub_services.rs similarity index 62% rename from crates/fuel-core/src/service/modules.rs rename to crates/fuel-core/src/service/sub_services.rs index e86718e1ec8..b6aeaa72f18 100644 --- a/crates/fuel-core/src/service/modules.rs +++ b/crates/fuel-core/src/service/sub_services.rs @@ -3,6 +3,7 @@ use super::adapters::P2PAdapter; use crate::{ chain_config::BlockProduction, database::Database, + fuel_core_graphql_api::Config as GraphQLConfig, service::{ adapters::{ BlockImportAdapter, @@ -12,9 +13,10 @@ use crate::{ TxPoolAdapter, }, Config, + SharedState, + SubServices, }, }; -use fuel_core_services::Service as ServiceTrait; use fuel_core_txpool::service::TxStatusChange; use std::sync::Arc; use tokio::sync::{ @@ -30,30 +32,12 @@ pub type RelayerService = fuel_core_relayer::Service; #[cfg(feature = "p2p")] pub type P2PService = fuel_core_p2p::service::Service; pub type TxPoolService = fuel_core_txpool::Service; +pub type GraphQL = crate::fuel_core_graphql_api::service::Service; -pub struct Modules { - pub txpool: TxPoolService, - pub block_producer: Arc>, - pub consensus_module: PoAService, - #[cfg(feature = "relayer")] - pub relayer: Option, - #[cfg(feature = "p2p")] - pub network_service: P2PService, -} - -impl Modules { - pub async fn stop(&self) { - self.consensus_module.stop_and_await().await.unwrap(); - self.txpool.stop_and_await().await.unwrap(); - #[cfg(feature = "p2p")] - self.network_service.stop_and_await().await.unwrap(); - } -} - -pub async fn start_modules( +pub fn init_sub_services( config: &Config, database: &Database, -) -> anyhow::Result { +) -> anyhow::Result<(SubServices, SharedState)> { #[cfg(feature = "relayer")] let relayer = if config.relayer.eth_client.is_some() { Some(fuel_core_relayer::new_service( @@ -67,7 +51,7 @@ pub async fn start_modules( let (block_import_tx, _) = broadcast::channel(16); #[cfg(feature = "p2p")] - let network_service = { + let network = { let p2p_db = database.clone(); let genesis = p2p_db.get_genesis()?; @@ -77,7 +61,7 @@ pub async fn start_modules( }; #[cfg(feature = "p2p")] - let p2p_adapter = P2PAdapter::new(network_service.shared.clone()); + let p2p_adapter = P2PAdapter::new(network.shared.clone()); #[cfg(not(feature = "p2p"))] let p2p_adapter = P2PAdapter::new(); @@ -85,7 +69,7 @@ pub async fn start_modules( let importer_adapter = BlockImportAdapter::new(block_import_tx); - let txpool_service = fuel_core_txpool::new_service( + let txpool = fuel_core_txpool::new_service( config.txpool.clone(), database.clone(), TxStatusChange::new(100), @@ -93,17 +77,19 @@ pub async fn start_modules( p2p_adapter, ); + let executor = ExecutorAdapter { + database: database.clone(), + config: config.clone(), + }; + // restrict the max number of concurrent dry runs to the number of CPUs // as execution in the worst case will be CPU bound rather than I/O bound. let max_dry_run_concurrency = num_cpus::get(); let block_producer = Arc::new(fuel_core_producer::Producer { config: config.block_producer.clone(), db: database.clone(), - txpool: Box::new(TxPoolAdapter::new(txpool_service.shared.clone())), - executor: Arc::new(ExecutorAdapter { - database: database.clone(), - config: config.clone(), - }), + txpool: Box::new(TxPoolAdapter::new(txpool.shared.clone())), + executor: Arc::new(executor.clone()), relayer: Box::new(MaybeRelayerAdapter { database: database.clone(), #[cfg(feature = "relayer")] @@ -113,8 +99,6 @@ pub async fn start_modules( dry_run_semaphore: Semaphore::new(max_dry_run_concurrency), }); - // start services - let poa = match &config.chain_conf.block_production { BlockProduction::ProofOfAuthority { trigger } => fuel_core_poa::new_service( fuel_core_poa::Config { @@ -123,7 +107,7 @@ pub async fn start_modules( signing_key: config.consensus_key.clone(), metrics: false, }, - TxPoolAdapter::new(txpool_service.shared.clone()), + TxPoolAdapter::new(txpool.shared.clone()), // TODO: Pass Importer importer_adapter.tx, BlockProducerAdapter { @@ -133,22 +117,51 @@ pub async fn start_modules( ), }; - poa.start()?; + let graph_ql = crate::fuel_core_graphql_api::service::new_service( + GraphQLConfig { + addr: config.addr, + utxo_validation: config.utxo_validation, + manual_blocks_enabled: config.manual_blocks_enabled, + vm_backtrace: config.vm.backtrace, + min_gas_price: config.txpool.min_gas_price, + max_tx: config.txpool.max_tx, + max_depth: config.txpool.max_depth, + transaction_parameters: config.chain_conf.transaction_parameters, + consensus_key: config.consensus_key.clone(), + }, + database.clone(), + block_producer, + txpool.clone(), + executor, + )?; + + let shared = SharedState { + txpool: txpool.shared.clone(), + #[cfg(feature = "p2p")] + network: network.shared.clone(), + #[cfg(feature = "relayer")] + relayer: relayer.as_ref().map(|r| r.shared.clone()), + graph_ql: graph_ql.shared.clone(), + }; + + #[allow(unused_mut)] + // `FuelService` starts and shutdowns all sub-services in the `services` order + let mut services: SubServices = vec![ + // GraphQL should be shutdown first, so let's start it first. + Box::new(graph_ql), + Box::new(poa), + Box::new(txpool), + ]; + #[cfg(feature = "relayer")] - if let Some(relayer) = relayer.as_ref() { - relayer.start().expect("Should start relayer") + if let Some(relayer) = relayer { + services.push(Box::new(relayer)); } - txpool_service.start()?; + #[cfg(feature = "p2p")] - network_service.start()?; + { + services.push(Box::new(network)); + } - Ok(Modules { - txpool: txpool_service, - block_producer, - consensus_module: poa, - #[cfg(feature = "relayer")] - relayer, - #[cfg(feature = "p2p")] - network_service, - }) + Ok((services, shared)) } diff --git a/crates/services/consensus_module/poa/src/service.rs b/crates/services/consensus_module/poa/src/service.rs index f35deca95db..03d63cb532c 100644 --- a/crates/services/consensus_module/poa/src/service.rs +++ b/crates/services/consensus_module/poa/src/service.rs @@ -19,7 +19,9 @@ use fuel_core_services::{ stream::BoxStream, EmptyShared, RunnableService, + RunnableTask, ServiceRunner, + StateWatcher, }; use fuel_core_storage::transactional::StorageTransaction; use fuel_core_types::{ @@ -275,6 +277,7 @@ where /// Processes the next incoming event. Called by the main event loop. /// Returns Ok(false) if the event loop should stop. async fn process_next_event(&mut self) -> anyhow::Result { + let should_continue; tokio::select! { // TODO: This should likely be refactored to use something like tokio::sync::Notify. // Otherwise, if a bunch of txs are submitted at once and all the txs are included @@ -285,36 +288,35 @@ where txpool_event = self.tx_status_update_stream.next() => { if let Some(txpool_event) = txpool_event { self.on_txpool_event(txpool_event).await.context("While processing txpool event")?; - Ok(true) + should_continue = true; } else { - let should_continue = false; - Ok(should_continue) + should_continue = false; } } at = self.timer.wait() => { self.on_timer(at).await.context("While processing timer event")?; - Ok(true) + should_continue = true; } } + Ok(should_continue) } } #[async_trait::async_trait] impl RunnableService for Task where - D: BlockDb, - T: TransactionPool, - B: BlockProducer, + Self: RunnableTask, { const NAME: &'static str = "PoA"; type SharedData = EmptyShared; + type Task = Task; fn shared_data(&self) -> Self::SharedData { EmptyShared } - async fn initialize(&mut self) -> anyhow::Result<()> { + async fn into_task(self, _: &StateWatcher) -> anyhow::Result { match self.trigger { Trigger::Never | Trigger::Instant => {} Trigger::Interval { block_time } => { @@ -328,9 +330,17 @@ where .await; } }; - Ok(()) + Ok(self) } +} +#[async_trait::async_trait] +impl RunnableTask for Task +where + D: BlockDb, + T: TransactionPool, + B: BlockProducer, +{ async fn run(&mut self) -> anyhow::Result { self.process_next_event().await } diff --git a/crates/services/consensus_module/poa/src/service_test.rs b/crates/services/consensus_module/poa/src/service_test.rs index 895b08d4b71..bdcac288ca3 100644 --- a/crates/services/consensus_module/poa/src/service_test.rs +++ b/crates/services/consensus_module/poa/src/service_test.rs @@ -507,7 +507,7 @@ async fn hybrid_production_doesnt_produce_empty_blocks_when_txpool_is_empty() { }; let service = Service::new(task); - service.start().unwrap(); + service.start_and_await().await.unwrap(); // simulate some txpool events to see if any block production is erroneously triggered txpool_tx.send(TxStatus::Submitted).unwrap(); diff --git a/crates/services/p2p/src/service.rs b/crates/services/p2p/src/service.rs index 0a7bb5ec97b..b6cbf6083f9 100644 --- a/crates/services/p2p/src/service.rs +++ b/crates/services/p2p/src/service.rs @@ -22,7 +22,9 @@ use crate::{ use anyhow::anyhow; use fuel_core_services::{ RunnableService, + RunnableTask, ServiceRunner, + StateWatcher, }; use fuel_core_types::{ blockchain::{ @@ -111,20 +113,28 @@ impl Task { #[async_trait::async_trait] impl RunnableService for Task where - D: P2pDb + 'static, + Self: RunnableTask, { const NAME: &'static str = "P2P"; type SharedData = SharedState; + type Task = Task; fn shared_data(&self) -> Self::SharedData { self.shared.clone() } - async fn initialize(&mut self) -> anyhow::Result<()> { - self.p2p_service.start() + async fn into_task(mut self, _: &StateWatcher) -> anyhow::Result { + self.p2p_service.start()?; + Ok(self) } +} +#[async_trait::async_trait] +impl RunnableTask for Task +where + D: P2pDb + 'static, +{ async fn run(&mut self) -> anyhow::Result { tokio::select! { // TODO: Maybe we want to use `biased;` to first process requests asked by us. @@ -373,10 +383,6 @@ pub mod tests { }, primitives::BlockHeight, }; - use tokio::time::{ - sleep, - Duration, - }; #[derive(Clone, Debug)] struct FakeDb; @@ -397,13 +403,12 @@ pub mod tests { } #[tokio::test] - async fn start_stop_works() { + async fn start_and_stop_awaits_works() { let p2p_config = Config::default_initialized("start_stop_works"); let service = new_service(p2p_config, FakeDb); // Node with p2p service started - assert!(service.start().is_ok()); - sleep(Duration::from_secs(1)).await; + assert!(service.start_and_await().await.unwrap().started()); // Node with p2p service stopped assert!(service.stop_and_await().await.unwrap().stopped()); } diff --git a/crates/services/relayer/src/lib.rs b/crates/services/relayer/src/lib.rs index a6b3e1a6fe4..8869abdd119 100644 --- a/crates/services/relayer/src/lib.rs +++ b/crates/services/relayer/src/lib.rs @@ -28,6 +28,6 @@ pub use ethers_core::types::{ }; pub use service::{ new_service, - RelayerSynced, Service, + SharedState, }; diff --git a/crates/services/relayer/src/service.rs b/crates/services/relayer/src/service.rs index 74ccc1ddda0..a9f70f129bf 100644 --- a/crates/services/relayer/src/service.rs +++ b/crates/services/relayer/src/service.rs @@ -23,7 +23,9 @@ use ethers_providers::{ }; use fuel_core_services::{ RunnableService, + RunnableTask, ServiceRunner, + StateWatcher, }; use fuel_core_storage::Result as StorageResult; use fuel_core_types::{ @@ -59,9 +61,10 @@ type NotifySynced = watch::Sender; pub type Service = CustomizableService, D>; type CustomizableService = ServiceRunner>; -/// Receives signals when the relayer reaches consistency with the DA layer. +/// The shared state of the relayer task. #[derive(Clone)] -pub struct RelayerSynced { +pub struct SharedState { + /// Receives signals when the relayer reaches consistency with the DA layer. synced: Synced, } @@ -153,19 +156,27 @@ where { const NAME: &'static str = "Relayer"; - type SharedData = RelayerSynced; + type SharedData = SharedState; + type Task = Task; fn shared_data(&self) -> Self::SharedData { let synced = self.synced.subscribe(); - RelayerSynced { synced } + SharedState { synced } } - async fn initialize(&mut self) -> anyhow::Result<()> { + async fn into_task(mut self, _: &StateWatcher) -> anyhow::Result { self.set_deploy_height().await; - Ok(()) + Ok(self) } +} +#[async_trait] +impl RunnableTask for Task +where + P: Middleware + 'static, + D: RelayerDb + 'static, +{ async fn run(&mut self) -> anyhow::Result { let now = tokio::time::Instant::now(); let result = run::run(self).await; @@ -182,7 +193,7 @@ where } } -impl RelayerSynced { +impl SharedState { /// Wait for the [`Task`] to be in sync with /// the data availability layer. /// diff --git a/crates/services/relayer/tests/integration.rs b/crates/services/relayer/tests/integration.rs index 7b5124c6753..4eb6272dc36 100644 --- a/crates/services/relayer/tests/integration.rs +++ b/crates/services/relayer/tests/integration.rs @@ -22,7 +22,7 @@ async fn can_set_da_height() { // will be some finalized blocks. eth_node.update_data(|data| data.best_block.number = Some(200.into())); let relayer = new_service_test(eth_node, mock_db.clone(), Default::default()); - relayer.start().unwrap(); + relayer.start_and_await().await.unwrap(); relayer.shared.await_synced().await.unwrap(); @@ -54,7 +54,7 @@ async fn can_get_messages() { // will be some finalized blocks. eth_node.update_data(|data| data.best_block.number = Some(200.into())); let relayer = new_service_test(eth_node, mock_db.clone(), config); - relayer.start().unwrap(); + relayer.start_and_await().await.unwrap(); relayer.shared.await_synced().await.unwrap(); @@ -98,7 +98,7 @@ async fn deploy_height_is_set() { } }); let relayer = new_service_test(eth_node, mock_db.clone(), config); - relayer.start().unwrap(); + relayer.start_and_await().await.unwrap(); relayer.shared.await_synced().await.unwrap(); rx.await.unwrap(); diff --git a/crates/services/src/lib.rs b/crates/services/src/lib.rs index 74b5975f4f3..9f6df4aa16a 100644 --- a/crates/services/src/lib.rs +++ b/crates/services/src/lib.rs @@ -21,8 +21,10 @@ pub mod stream { pub use service::{ EmptyShared, RunnableService, + RunnableTask, Service, ServiceRunner, Shared, State, + StateWatcher, }; diff --git a/crates/services/src/service.rs b/crates/services/src/service.rs index 49c1df31c6d..45d6db639e1 100644 --- a/crates/services/src/service.rs +++ b/crates/services/src/service.rs @@ -6,6 +6,8 @@ use tokio::{ /// Alias for Arc pub type Shared = std::sync::Arc; +/// The alias for `State` watcher to re-export `watch::Receiver`. +pub type StateWatcher = watch::Receiver; /// Used if services have no asynchronously shared data #[derive(Debug, Clone, Copy, PartialEq, Eq)] @@ -15,10 +17,17 @@ pub struct EmptyShared; /// the lifecycle of services such as start/stop and health status. #[async_trait::async_trait] pub trait Service { - /// Send a start signal to the `ServiceRunner`. Returns an error if the service was already - /// started. + /// Send a start signal to the service without waiting for it to start. + /// Returns an error if the service was already started. fn start(&self) -> anyhow::Result<()>; + /// Send a start signal to the service and wait for it to start up. + /// Returns an error if the service was already started. + async fn start_and_await(&self) -> anyhow::Result; + + /// Wait for service to start or stop (without sending any signal). + async fn await_start_or_stop(&self) -> anyhow::Result; + /// Send a stop signal to the service without waiting for it to shutdown. /// Returns false if the service was already stopped, true if it is running. fn stop(&self) -> bool; @@ -26,7 +35,7 @@ pub trait Service { /// Send stop signal to service and wait for it to shutdown. async fn stop_and_await(&self) -> anyhow::Result; - /// Wait for service to stop (without sending a stop signal) + /// Wait for service to stop (without sending a stop signal). async fn await_stop(&self) -> anyhow::Result; /// The current state of the service (i.e. `Started`, `Stopped`, etc..) @@ -45,14 +54,25 @@ pub trait RunnableService: Send { /// to be shared between asynchronous processes. type SharedData: Clone + Send + Sync; + /// The initialized runnable task type. + type Task: RunnableTask; + /// A cloned instance of the shared data fn shared_data(&self) -> Self::SharedData; - /// Service specific initialization logic before main task run loop. - async fn initialize(&mut self) -> anyhow::Result<()>; + /// Converts the service into a runnable task before the main run loop. + /// + /// The `state` is a `State` watcher of the service. Some tasks may handle state changes + /// on their own. + async fn into_task(self, state: &StateWatcher) -> anyhow::Result; +} - /// This function should contain the main business logic of the service. It will run until the - /// service either returns false, panics or a stop signal is received. +/// The trait is implemented by the service task and contains a single iteration of the infinity +/// loop. +#[async_trait::async_trait] +pub trait RunnableTask: Send { + /// This function should contain the main business logic of the service task. It will run until + /// the service either returns false, panics or a stop signal is received. /// If the service returns an error, it will be logged and execution will resume. /// This is intended to be called only by the `ServiceRunner`. async fn run(&mut self) -> anyhow::Result; @@ -63,6 +83,8 @@ pub trait RunnableService: Send { pub enum State { /// Service is initialized but not started NotStarted, + /// Service is starting up + Starting, /// Service is running as normal Started, /// Service is shutting down @@ -79,6 +101,11 @@ impl State { self == &State::NotStarted } + /// is starting + pub fn starting(&self) -> bool { + self == &State::Starting + } + /// is started pub fn started(&self) -> bool { self == &State::Started @@ -125,24 +152,26 @@ where Self { shared, state } } - async fn await_stop_and_maybe_call(&self, call: F) -> anyhow::Result - where - F: Fn(&Self), - { - let mut stop = self.state.subscribe(); - let state = stop.borrow().clone(); - if state.stopped() { - Ok(state) - } else { - call(self); + async fn _await_start_or_stop( + &self, + mut start: StateWatcher, + ) -> anyhow::Result { + loop { + let state = start.borrow().clone(); + if !state.starting() { + return Ok(state) + } + start.changed().await?; + } + } - loop { - let state = stop.borrow_and_update().clone(); - if state.stopped() { - return Ok(state) - } - stop.changed().await?; + async fn _await_stop(&self, mut stop: StateWatcher) -> anyhow::Result { + loop { + let state = stop.borrow().clone(); + if state.stopped() { + return Ok(state) } + stop.changed().await?; } } } @@ -155,7 +184,7 @@ where fn start(&self) -> anyhow::Result<()> { let started = self.state.send_if_modified(|state| { if state.not_started() { - *state = State::Started; + *state = State::Starting; true } else { false @@ -172,9 +201,20 @@ where } } + async fn start_and_await(&self) -> anyhow::Result { + let start = self.state.subscribe(); + self.start()?; + self._await_start_or_stop(start).await + } + + async fn await_start_or_stop(&self) -> anyhow::Result { + let start = self.state.subscribe(); + self._await_start_or_stop(start).await + } + fn stop(&self) -> bool { self.state.send_if_modified(|state| { - if state.not_started() || state.started() { + if state.not_started() || state.starting() || state.started() { *state = State::Stopping; true } else { @@ -184,14 +224,14 @@ where } async fn stop_and_await(&self) -> anyhow::Result { - self.await_stop_and_maybe_call(|runner| { - runner.stop(); - }) - .await + let stop = self.state.subscribe(); + self.stop(); + self._await_stop(stop).await } async fn await_stop(&self) -> anyhow::Result { - self.await_stop_and_maybe_call(|_| {}).await + let stop = self.state.subscribe(); + self._await_stop(stop).await } fn state(&self) -> State { @@ -204,12 +244,12 @@ fn initialize_loop(service: S) -> Shared> where S: RunnableService + 'static, { - let (sender, receiver) = watch::channel(State::NotStarted); + let (sender, _) = watch::channel(State::NotStarted); let state = Shared::new(sender); let stop_sender = state.clone(); // Spawned as a task to check if the service is already running and to capture any panics. tokio::task::spawn(async move { - let join_handler = run(service, receiver.clone()); + let join_handler = run(service, stop_sender.clone()); let result = join_handler.await; let stopped_state = if let Err(e) = result { @@ -231,26 +271,41 @@ where } /// Spawns a task for the main background run loop. -fn run(mut service: S, mut state: watch::Receiver) -> JoinHandle<()> +fn run(service: S, sender: Shared>) -> JoinHandle<()> where S: RunnableService + 'static, { + let mut state = sender.subscribe(); tokio::task::spawn(async move { if state.borrow_and_update().not_started() { // We can panic here, because it is inside of the task. state.changed().await.expect("The service is destroyed"); } - // If the state after update is not `Started` then return to stop the service. - if !state.borrow().started() { + // If the state after update is not `Starting` then return to stop the service. + if !state.borrow().starting() { return } // We can panic here, because it is inside of the task. - service - .initialize() + let mut task = service + .into_task(&state) .await .expect("The initialization of the service failed."); + + let started = sender.send_if_modified(|s| { + if s.starting() { + *s = State::Started; + true + } else { + false + } + }); + + if !started { + return + } + loop { tokio::select! { biased; @@ -261,7 +316,7 @@ where } } - result = service.run() => { + result = task.run() => { match result { Ok(should_continue) => { if !should_continue { @@ -282,13 +337,8 @@ where // TODO: Add tests #[cfg(test)] mod tests { - use crate::{ - EmptyShared, - RunnableService, - Service as ServiceTrait, - ServiceRunner, - State, - }; + use super::*; + use futures::future::BoxFuture; mockall::mock! { Service {} @@ -298,16 +348,23 @@ mod tests { const NAME: &'static str = "MockService"; type SharedData = EmptyShared; + type Task = MockTask; - fn shared_data(&self) -> EmptyShared { - EmptyShared - } + fn shared_data(&self) -> EmptyShared; - async fn initialize(&mut self) -> anyhow::Result<()> { - Ok(()) - } + async fn into_task(self, state: &StateWatcher) -> anyhow::Result; + } + } - async fn run(&mut self) -> anyhow::Result; + mockall::mock! { + Task {} + + #[async_trait::async_trait] + impl RunnableTask for Task { + fn run<'_self, 'a>(&'_self mut self) -> BoxFuture<'a, anyhow::Result> + where + '_self: 'a, + Self: Sync + 'a; } } @@ -315,12 +372,39 @@ mod tests { fn new_empty() -> Self { let mut mock = MockService::default(); mock.expect_shared_data().returning(|| EmptyShared); - mock.expect_initialize().returning(|| Ok(())); - mock.expect_run().returning(|| Ok(true)); + mock.expect_into_task().returning(|_| { + let mut mock = MockTask::default(); + mock.expect_run() + .returning(|| Box::pin(core::future::pending())); + Ok(mock) + }); mock } } + #[tokio::test] + async fn start_and_await_stop_and_await_works() { + let service = ServiceRunner::new(MockService::new_empty()); + let state = service.start_and_await().await.unwrap(); + assert!(state.started()); + let state = service.stop_and_await().await.unwrap(); + assert!(state.stopped()); + } + + #[tokio::test] + async fn double_start_fails() { + let service = ServiceRunner::new(MockService::new_empty()); + assert!(service.start().is_ok()); + assert!(service.start().is_err()); + } + + #[tokio::test] + async fn double_start_and_await_fails() { + let service = ServiceRunner::new(MockService::new_empty()); + assert!(service.start_and_await().await.is_ok()); + assert!(service.start_and_await().await.is_err()); + } + #[tokio::test] async fn stop_without_start() { let service = ServiceRunner::new(MockService::new_empty()); @@ -331,10 +415,14 @@ mod tests { async fn panic_during_run() { let mut mock = MockService::default(); mock.expect_shared_data().returning(|| EmptyShared); - mock.expect_initialize().returning(|| Ok(())); - mock.expect_run().returning(|| panic!("Should fail")); + mock.expect_into_task().returning(|_| { + let mut mock = MockTask::default(); + mock.expect_run().returning(|| panic!("Should fail")); + Ok(mock) + }); let service = ServiceRunner::new(mock); - service.start().unwrap(); + let state = service.start_and_await().await.unwrap(); + assert!(matches!(state, State::StoppedWithError(_))); let state = service.await_stop().await.unwrap(); assert!(matches!(state, State::StoppedWithError(_))); diff --git a/crates/services/txpool/src/service.rs b/crates/services/txpool/src/service.rs index 222859bd8a9..731e6419506 100644 --- a/crates/services/txpool/src/service.rs +++ b/crates/services/txpool/src/service.rs @@ -12,7 +12,9 @@ use crate::{ use fuel_core_services::{ stream::BoxStream, RunnableService, + RunnableTask, ServiceRunner, + StateWatcher, }; use fuel_core_types::{ blockchain::SealedBlock, @@ -110,16 +112,25 @@ where const NAME: &'static str = "TxPool"; type SharedData = SharedState; + type Task = Task; fn shared_data(&self) -> Self::SharedData { self.shared.clone() } - async fn initialize(&mut self) -> anyhow::Result<()> { - Ok(()) + async fn into_task(self, _: &StateWatcher) -> anyhow::Result { + Ok(self) } +} +#[async_trait::async_trait] +impl RunnableTask for Task +where + P2P: Send + Sync, + DB: TxPoolDb, +{ async fn run(&mut self) -> anyhow::Result { + let should_continue; tokio::select! { new_transaction = self.gossiped_tx_stream.next() => { if let Some(GossipData { data: Some(tx), .. }) = new_transaction { @@ -128,22 +139,22 @@ where &self.shared.tx_status_sender, &txs ); + should_continue = true; } else { - let should_continue = false; - return Ok(should_continue); + should_continue = false; } } block = self.committed_block_stream.next() => { if let Some(block) = block { self.shared.txpool.lock().block_update(&self.shared.tx_status_sender, block); + should_continue = true; } else { - let should_continue = false; - return Ok(should_continue); + should_continue = false; } } } - Ok(true /* should_continue */) + Ok(should_continue) } } diff --git a/crates/services/txpool/src/service/test_helpers.rs b/crates/services/txpool/src/service/test_helpers.rs index bb14da3d2cf..a5623ea65af 100644 --- a/crates/services/txpool/src/service/test_helpers.rs +++ b/crates/services/txpool/src/service/test_helpers.rs @@ -34,7 +34,7 @@ pub struct TestContext { impl TestContext { pub async fn new() -> Self { - TestContextBuilder::new().build().await + TestContextBuilder::new().build_and_start().await } pub fn service(&self) -> &Service { @@ -162,8 +162,7 @@ impl TestContextBuilder { pub fn setup_coin(&mut self) -> (Coin, Input) { crate::test_helpers::setup_coin(&mut self.rng, Some(&self.mock_db)) } - - pub async fn build(self) -> TestContext { + pub fn build(self) -> TestContext { let rng = RefCell::new(self.rng); let config = Config::default(); let mock_db = self.mock_db; @@ -175,7 +174,6 @@ impl TestContextBuilder { .unwrap_or_else(|| MockImporter::with_blocks(vec![])); let service = new_service(config, mock_db.clone(), status_tx, importer, p2p); - service.start().unwrap(); TestContext { service, @@ -183,4 +181,10 @@ impl TestContextBuilder { rng, } } + + pub async fn build_and_start(self) -> TestContext { + let context = self.build(); + context.service.start_and_await().await.unwrap(); + context + } } diff --git a/crates/services/txpool/src/service/tests.rs b/crates/services/txpool/src/service/tests.rs index 58ce5d11a45..ed1b0646d50 100644 --- a/crates/services/txpool/src/service/tests.rs +++ b/crates/services/txpool/src/service/tests.rs @@ -48,7 +48,7 @@ async fn test_find() { #[tokio::test] async fn simple_insert_removal_subscription() { - let ctx = TestContextBuilder::new().build().await; + let ctx = TestContextBuilder::new().build_and_start().await; let tx1 = Arc::new(ctx.setup_script_tx(10)); let tx2 = Arc::new(ctx.setup_script_tx(20)); diff --git a/crates/services/txpool/src/service/tests_p2p.rs b/crates/services/txpool/src/service/tests_p2p.rs index d7f5ff7ea78..77e101bd928 100644 --- a/crates/services/txpool/src/service/tests_p2p.rs +++ b/crates/services/txpool/src/service/tests_p2p.rs @@ -3,6 +3,7 @@ use crate::service::test_helpers::{ MockP2P, TestContextBuilder, }; +use fuel_core_services::Service; use fuel_core_types::fuel_tx::{ Transaction, UniqueIdentifier, @@ -20,10 +21,12 @@ async fn can_insert_from_p2p() { let p2p = MockP2P::new_with_txs(vec![tx1.clone()]); ctx_builder.with_p2p(p2p); - let ctx = ctx_builder.build().await; + let ctx = ctx_builder.build(); let service = ctx.service(); - let mut receiver = service.shared.tx_update_subscribe(); + + service.start_and_await().await.unwrap(); + let res = receiver.recv().await; assert!(res.is_ok()); @@ -50,7 +53,7 @@ async fn insert_from_local_broadcasts_to_p2p() { ctx_builder.with_p2p(p2p); // build and start the txpool service - let ctx = ctx_builder.build().await; + let ctx = ctx_builder.build_and_start().await; let service = ctx.service(); let mut subscribe_status = service.shared.tx_status_subscribe(); @@ -94,11 +97,13 @@ async fn test_insert_from_p2p_does_not_broadcast_to_p2p() { ctx_builder.with_p2p(p2p); // build and start the txpool service - let ctx = ctx_builder.build().await; + let ctx = ctx_builder.build(); let service = ctx.service(); - // verify tx status update from p2p injected tx is successful let mut receiver = service.shared.tx_update_subscribe(); + + service.start_and_await().await.unwrap(); + let res = receiver.recv().await; assert!(res.is_ok()); diff --git a/tests/tests/metrics.rs b/tests/tests/metrics.rs index cd0d8d71918..00535fd1769 100644 --- a/tests/tests/metrics.rs +++ b/tests/tests/metrics.rs @@ -2,6 +2,7 @@ use fuel_core::service::{ Config, DbType, FuelService, + ServiceTrait, }; use fuel_core_client::client::FuelClient; use fuel_core_types::{ @@ -60,7 +61,7 @@ async fn test_metrics_endpoint() { let categories = resp.split('\n').collect::>(); - srv.stop().await; + srv.stop_and_await().await.unwrap(); // Gt check exists because testing can be weird with multiple instances running assert!(categories.len() == 16); diff --git a/tests/tests/relayer.rs b/tests/tests/relayer.rs index 7727b3c7778..6873dc58b34 100644 --- a/tests/tests/relayer.rs +++ b/tests/tests/relayer.rs @@ -10,6 +10,7 @@ use fuel_core::{ service::{ Config, FuelService, + ServiceTrait, }, }; use fuel_core_client::client::{ @@ -112,7 +113,7 @@ async fn relayer_can_download_logs() { &*msg ); } - srv.stop().await; + srv.stop_and_await().await.unwrap(); eth_node_handle.shutdown.send(()).unwrap(); } @@ -217,7 +218,7 @@ async fn messages_are_spendable_after_relayer_is_synced() { Some(1u64) ); - srv.stop().await; + srv.stop_and_await().await.unwrap(); eth_node_handle.shutdown.send(()).unwrap(); }