From 0ff29281ec436b063e3ba8acc21c9434a708064f Mon Sep 17 00:00:00 2001 From: Liu-Cheng Xu Date: Sat, 24 Aug 2024 23:59:23 +0800 Subject: [PATCH] Ensure only one finalize block task at the same time (#46) * Ensure no multiple finalize-block tasks at the same time * Remove major_sync_confirmation_depth * Start subcoin networking after the start of substrate networking * Disable subcoin block sync when substrate fast sync is on * Spawn a task for starting subcoin block sync when substrate state sync is complete * Start subcoin block sync * Fix clippy --- Cargo.lock | 2 + crates/subcoin-network/src/lib.rs | 17 +++ crates/subcoin-network/src/sync.rs | 16 +++ crates/subcoin-network/src/worker.rs | 17 ++- crates/subcoin-node/src/cli.rs | 1 - crates/subcoin-node/src/commands/run.rs | 77 +++++++------ crates/subcoin-service/Cargo.toml | 2 + crates/subcoin-service/src/finalizer.rs | 141 +++++++++++++++++------- crates/subcoin-service/src/lib.rs | 31 ++++++ 9 files changed, 227 insertions(+), 77 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index b9ccc073a0d90..7ffbdfe7eddf2 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -9244,6 +9244,7 @@ dependencies = [ "futures", "jsonrpsee", "pallet-bitcoin", + "parking_lot 0.12.3", "sc-client-api", "sc-consensus", "sc-consensus-nakamoto", @@ -9273,6 +9274,7 @@ dependencies = [ "sp-storage 19.0.0 (git+https://github.com/subcoin-project/polkadot-sdk?branch=subcoin-v1)", "sp-tracing 16.0.0 (git+https://github.com/subcoin-project/polkadot-sdk?branch=subcoin-v1)", "sp-trie", + "subcoin-network", "subcoin-primitives", "subcoin-runtime", "subcoin-test-service", diff --git a/crates/subcoin-network/src/lib.rs b/crates/subcoin-network/src/lib.rs index a21c527e4fd4c..0226d0ba5ac68 100644 --- a/crates/subcoin-network/src/lib.rs +++ b/crates/subcoin-network/src/lib.rs @@ -226,6 +226,8 @@ enum NetworkWorkerMessage { GetTransaction((Txid, oneshot::Sender>)), /// Add transaction to the transaction manager. SendTransaction((IncomingTransaction, oneshot::Sender)), + /// Enable the block sync in the chain sync component. + StartBlockSync, } /// A handle for interacting with the network worker. @@ -306,6 +308,12 @@ impl NetworkHandle { .unwrap_or(SendTransactionResult::Failure("Internal error".to_string())) } + pub fn start_block_sync(&self) -> bool { + self.worker_msg_sender + .unbounded_send(NetworkWorkerMessage::StartBlockSync) + .is_ok() + } + /// Returns a flag indicating whether the node is actively performing a major sync. pub fn is_major_syncing(&self) -> Arc { self.is_major_syncing.clone() @@ -330,6 +338,8 @@ pub struct Params { pub max_inbound_peers: usize, /// Major sync strategy. pub sync_strategy: SyncStrategy, + /// Whether the Substrate fast sync is enabled. + pub substrate_fast_sync_enabled: bool, } fn builtin_seednodes(network: BitcoinNetwork) -> &'static [&'static str] { @@ -449,6 +459,12 @@ where params.ipv4_only, ); + let enable_block_sync = !params.substrate_fast_sync_enabled; + + if !enable_block_sync { + tracing::info!("Subcoin block sync is disabled until Substrate fast sync is complete"); + } + let network_worker = NetworkWorker::new( worker::Params { client: client.clone(), @@ -458,6 +474,7 @@ where is_major_syncing, connection_initiator: connection_initiator.clone(), max_outbound_peers: params.max_outbound_peers, + enable_block_sync, }, registry.as_ref(), ); diff --git a/crates/subcoin-network/src/sync.rs b/crates/subcoin-network/src/sync.rs index 4689edf40eb76..a8d0a343ebb80 100644 --- a/crates/subcoin-network/src/sync.rs +++ b/crates/subcoin-network/src/sync.rs @@ -152,8 +152,13 @@ pub(crate) struct ChainSync { syncing: Syncing, /// Handle of the import queue. import_queue: BlockImportQueue, + /// Block syncing strategy. sync_strategy: SyncStrategy, + /// Are we in major syncing? is_major_syncing: Arc, + /// Whether to sync blocks from Bitcoin network. + enable_block_sync: bool, + /// Randomness generator. rng: fastrand::Rng, _phantom: PhantomData, } @@ -169,6 +174,7 @@ where import_queue: BlockImportQueue, sync_strategy: SyncStrategy, is_major_syncing: Arc, + enable_block_sync: bool, ) -> Self { Self { client, @@ -177,6 +183,7 @@ where syncing: Syncing::Idle, sync_strategy, is_major_syncing, + enable_block_sync, rng: fastrand::Rng::new(), _phantom: Default::default(), } @@ -344,6 +351,15 @@ where self.peers.insert(peer_id, new_peer); + if self.enable_block_sync { + self.attempt_sync_start() + } else { + SyncAction::None + } + } + + pub(super) fn start_block_sync(&mut self) -> SyncAction { + self.enable_block_sync = true; self.attempt_sync_start() } diff --git a/crates/subcoin-network/src/worker.rs b/crates/subcoin-network/src/worker.rs index 1fd33f3e631aa..2157928637439 100644 --- a/crates/subcoin-network/src/worker.rs +++ b/crates/subcoin-network/src/worker.rs @@ -55,6 +55,8 @@ pub struct Params { pub is_major_syncing: Arc, pub connection_initiator: ConnectionInitiator, pub max_outbound_peers: usize, + /// Whether to enable block sync on start. + pub enable_block_sync: bool, } /// [`NetworkWorker`] is responsible for processing the network events. @@ -82,6 +84,7 @@ where is_major_syncing, connection_initiator, max_outbound_peers, + enable_block_sync, } = params; let config = Config::new(); @@ -101,11 +104,19 @@ where metrics.clone(), ); + let chain_sync = ChainSync::new( + client, + import_queue, + sync_strategy, + is_major_syncing, + enable_block_sync, + ); + Self { network_event_receiver, peer_manager, transaction_manager: TransactionManager::new(), - chain_sync: ChainSync::new(client, import_queue, sync_strategy, is_major_syncing), + chain_sync, metrics, config, } @@ -262,6 +273,10 @@ where }; let _ = result_sender.send(send_transaction_result); } + NetworkWorkerMessage::StartBlockSync => { + let sync_action = self.chain_sync.start_block_sync(); + self.do_sync_action(sync_action); + } } } diff --git a/crates/subcoin-node/src/cli.rs b/crates/subcoin-node/src/cli.rs index 7c1706ecaadcf..d8629dd3047fd 100644 --- a/crates/subcoin-node/src/cli.rs +++ b/crates/subcoin-node/src/cli.rs @@ -131,7 +131,6 @@ pub fn run() -> sc_cli::Result<()> { client, spawn_handle, CONFIRMATION_DEPTH, - 100, is_major_syncing, None, ) diff --git a/crates/subcoin-node/src/commands/run.rs b/crates/subcoin-node/src/commands/run.rs index 27895e8c92c13..8a41fddcf88be 100644 --- a/crates/subcoin-node/src/commands/run.rs +++ b/crates/subcoin-node/src/commands/run.rs @@ -2,7 +2,7 @@ use crate::cli::params::{CommonParams, NetworkParams}; use clap::Parser; use sc_cli::{ ImportParams, NetworkParams as SubstrateNetworkParams, NodeKeyParams, PrometheusParams, Role, - SharedParams, + SharedParams, SyncMode, }; use sc_client_api::UsageProvider; use sc_consensus_nakamoto::BitcoinBlockImporter; @@ -18,13 +18,6 @@ pub struct Run { #[clap(long, default_value = "headers-first")] pub sync_strategy: SyncStrategy, - /// Specify the confirmation depth during the major sync. - /// - /// If you encounter a high memory usage when the node is major syncing, try to - /// specify a smaller number. - #[clap(long, default_value = "100")] - pub major_sync_confirmation_depth: u32, - /// Do not run the finalizer which will finalize the blocks on confirmation depth. #[clap(long)] pub no_finalizer: bool, @@ -56,6 +49,10 @@ pub struct Run { impl Run { pub fn subcoin_network_params(&self, network: bitcoin::Network) -> subcoin_network::Params { + let substrate_fast_sync_enabled = matches!( + self.substrate_network_params.sync, + SyncMode::Fast | SyncMode::FastUnsafe + ); subcoin_network::Params { network, listen_on: self.network_params.listen, @@ -65,6 +62,7 @@ impl Run { max_outbound_peers: self.network_params.max_outbound_peers, max_inbound_peers: self.network_params.max_inbound_peers, sync_strategy: self.sync_strategy, + substrate_fast_sync_enabled, } } } @@ -100,7 +98,6 @@ impl RunCmd { let bitcoin_network = run.common_params.bitcoin_network(); let import_config = run.common_params.import_config(); let no_finalizer = run.no_finalizer; - let major_sync_confirmation_depth = run.major_sync_confirmation_depth; let subcoin_service::NodeComponents { client, @@ -138,29 +135,6 @@ impl RunCmd { bitcoin_block_import, ); - let (subcoin_networking, subcoin_network_handle) = subcoin_network::Network::new( - client.clone(), - run.subcoin_network_params(bitcoin_network), - import_queue, - spawn_handle.clone(), - config.prometheus_registry().cloned(), - ); - - // TODO: handle Substrate networking and Bitcoin networking properly. - if !run.disable_subcoin_networking { - task_manager.spawn_essential_handle().spawn_blocking( - "subcoin-networking", - None, - async move { - if let Err(err) = subcoin_networking.run().await { - tracing::error!(?err, "Error occurred in subcoin networking"); - } - }, - ); - } else { - task_manager.keep_alive(subcoin_networking); - } - let (system_rpc_tx, substrate_sync_service) = match config.network.network_backend { sc_network::config::NetworkBackendType::Libp2p => { subcoin_service::start_substrate_network::< @@ -189,6 +163,44 @@ impl RunCmd { } }; + let subcoin_network_params = run.subcoin_network_params(bitcoin_network); + + let substrate_fast_sync_enabled = subcoin_network_params.substrate_fast_sync_enabled; + + let (subcoin_networking, subcoin_network_handle) = subcoin_network::Network::new( + client.clone(), + subcoin_network_params, + import_queue, + spawn_handle.clone(), + config.prometheus_registry().cloned(), + ); + + // TODO: handle Substrate networking and Bitcoin networking properly. + if !run.disable_subcoin_networking { + task_manager.spawn_essential_handle().spawn_blocking( + "subcoin-networking", + None, + async move { + if let Err(err) = subcoin_networking.run().await { + tracing::error!(?err, "Error occurred in subcoin networking"); + } + }, + ); + + if substrate_fast_sync_enabled { + task_manager.spawn_handle().spawn( + "substrate-fast-sync-watcher", + None, + subcoin_service::watch_substrate_fast_sync( + subcoin_network_handle.clone(), + substrate_sync_service.clone(), + ), + ); + } + } else { + task_manager.keep_alive(subcoin_networking); + } + // TODO: Bitcoin-compatible RPC // Start JSON-RPC server. let gen_rpc_module = |deny_unsafe: sc_rpc::DenyUnsafe| { @@ -220,7 +232,6 @@ impl RunCmd { client.clone(), spawn_handle.clone(), CONFIRMATION_DEPTH, - major_sync_confirmation_depth, subcoin_network_handle.is_major_syncing(), Some(substrate_sync_service), ) diff --git a/crates/subcoin-service/Cargo.toml b/crates/subcoin-service/Cargo.toml index 254d08cb1271b..9a106e1d18e63 100644 --- a/crates/subcoin-service/Cargo.toml +++ b/crates/subcoin-service/Cargo.toml @@ -14,6 +14,7 @@ frame-system = { workspace = true } futures = { workspace = true } jsonrpsee = { workspace = true } pallet-bitcoin = { workspace = true } +parking_lot = { workspace = true } sc-client-api = { workspace = true } sc-consensus = { workspace = true } sc-consensus-nakamoto = { workspace = true } @@ -43,6 +44,7 @@ sp-runtime = { workspace = true } sp-state-machine = { workspace = true } sp-storage = { workspace = true } sp-trie = { workspace = true } +subcoin-network = { workspace = true } subcoin-primitives = { workspace = true } subcoin-runtime = { workspace = true } substrate-frame-rpc-system = { workspace = true } diff --git a/crates/subcoin-service/src/finalizer.rs b/crates/subcoin-service/src/finalizer.rs index 5e748eee4edc9..6e60866d101d9 100644 --- a/crates/subcoin-service/src/finalizer.rs +++ b/crates/subcoin-service/src/finalizer.rs @@ -1,19 +1,21 @@ use futures::StreamExt; +use parking_lot::Mutex; use sc_client_api::{BlockchainEvents, Finalizer, HeaderBackend}; use sc_network_sync::SyncingService; use sc_service::SpawnTaskHandle; use sp_consensus::SyncOracle; -use sp_runtime::traits::{Block as BlockT, CheckedSub}; +use sp_runtime::traits::{Block as BlockT, CheckedSub, NumberFor}; use std::marker::PhantomData; use std::sync::atomic::{AtomicBool, Ordering}; use std::sync::Arc; +type BlockInfo = (NumberFor, ::Hash); + /// This struct is responsible for finalizing blocks with enough confirmations. pub struct SubcoinFinalizer { client: Arc, spawn_handle: SpawnTaskHandle, confirmation_depth: u32, - major_sync_confirmation_depth: u32, subcoin_networking_is_major_syncing: Arc, substrate_sync_service: Option>>, _phantom: PhantomData, @@ -30,7 +32,6 @@ where client: Arc, spawn_handle: SpawnTaskHandle, confirmation_depth: u32, - major_sync_confirmation_depth: u32, subcoin_networking_is_major_syncing: Arc, substrate_sync_service: Option>>, ) -> Self { @@ -38,7 +39,6 @@ where client, spawn_handle, confirmation_depth, - major_sync_confirmation_depth, subcoin_networking_is_major_syncing, substrate_sync_service, _phantom: Default::default(), @@ -51,7 +51,6 @@ where client, spawn_handle, confirmation_depth, - major_sync_confirmation_depth, subcoin_networking_is_major_syncing, substrate_sync_service, _phantom, @@ -61,6 +60,11 @@ where // the Substrate network is major syncing. let mut block_import_stream = client.every_import_notification_stream(); + let cached_block_to_finalize: Arc>>> = + Arc::new(Mutex::new(None)); + + let finalizer_worker_is_busy = Arc::new(AtomicBool::new(false)); + while let Some(notification) = block_import_stream.next().await { let block_number = client .number(notification.hash) @@ -79,15 +83,26 @@ where continue; } - if subcoin_networking_is_major_syncing.load(Ordering::Relaxed) { - // During the major sync of Subcoin networking, we choose to finalize every `major_sync_confirmation_depth` - // block to avoid race conditions: - // >Safety violation: attempted to revert finalized block... - if confirmed_block_number < finalized_number + major_sync_confirmation_depth.into() - { - continue; + let confirmed_block_hash = client + .hash(confirmed_block_number) + .ok() + .flatten() + .expect("Confirmed block must be available; qed"); + + let try_update_cached_block_to_finalize = || { + let mut cached_block_to_finalize = cached_block_to_finalize.lock(); + + let should_update = cached_block_to_finalize + .map(|(cached_block_number, _)| confirmed_block_number > cached_block_number) + .unwrap_or(true); + + if should_update { + cached_block_to_finalize + .replace((confirmed_block_number, confirmed_block_hash)); } - } + + drop(cached_block_to_finalize); + }; if let Some(sync_service) = substrate_sync_service.as_ref() { // State sync relies on the finalized block notification to progress @@ -98,52 +113,94 @@ where if sync_service.is_major_syncing() && sync_service.num_queued_blocks().await.unwrap_or(0) > 0 { + try_update_cached_block_to_finalize(); continue; } } - let confirmed_block_hash = client - .hash(confirmed_block_number) - .ok() - .flatten() - .expect("Confirmed block must be available; qed"); + if finalizer_worker_is_busy.load(Ordering::SeqCst) { + try_update_cached_block_to_finalize(); + continue; + } let client = client.clone(); let subcoin_networking_is_major_syncing = subcoin_networking_is_major_syncing.clone(); let substrate_sync_service = substrate_sync_service.clone(); + let finalizer_worker_is_busy = finalizer_worker_is_busy.clone(); + let cached_block_to_finalize = cached_block_to_finalize.clone(); + + finalizer_worker_is_busy.store(true, Ordering::SeqCst); spawn_handle.spawn( "finalize-block", None, Box::pin(async move { - let finalized_number = client.info().finalized_number; - - if confirmed_block_number <= finalized_number { - return; + do_finalize_block( + &client, + confirmed_block_number, + confirmed_block_hash, + &subcoin_networking_is_major_syncing, + substrate_sync_service.as_ref(), + ); + + let mut cached_block_to_finalize = cached_block_to_finalize.lock(); + let maybe_cached_block_to_finalize = cached_block_to_finalize.take(); + drop(cached_block_to_finalize); + + if let Some((cached_block_number, cached_block_hash)) = + maybe_cached_block_to_finalize + { + do_finalize_block( + &client, + cached_block_number, + cached_block_hash, + &subcoin_networking_is_major_syncing, + substrate_sync_service.as_ref(), + ); } - match client.finalize_block(confirmed_block_hash, None, true) { - Ok(()) => { - let is_major_syncing = subcoin_networking_is_major_syncing.load(Ordering::Relaxed) - || substrate_sync_service - .map(|sync_service| sync_service.is_major_syncing()) - .unwrap_or(false); - - // Only print the log when not major syncing to not clutter the logs. - if !is_major_syncing { - tracing::info!("✅ Successfully finalized block #{confirmed_block_number},{confirmed_block_hash}"); - } - } - Err(err) => { - tracing::warn!( - ?err, - ?finalized_number, - "Failed to finalize block #{confirmed_block_number},{confirmed_block_hash}", - ); - } - } + finalizer_worker_is_busy.store(false, Ordering::SeqCst); }), ); } } } + +fn do_finalize_block( + client: &Arc, + confirmed_block_number: NumberFor, + confirmed_block_hash: Block::Hash, + subcoin_networking_is_major_syncing: &Arc, + substrate_sync_service: Option<&Arc>>, +) where + Block: BlockT, + Client: HeaderBackend + Finalizer, + Backend: sc_client_api::backend::Backend, +{ + let finalized_number = client.info().finalized_number; + + if confirmed_block_number <= finalized_number { + return; + } + + match client.finalize_block(confirmed_block_hash, None, true) { + Ok(()) => { + let is_major_syncing = subcoin_networking_is_major_syncing.load(Ordering::Relaxed) + || substrate_sync_service + .map(|sync_service| sync_service.is_major_syncing()) + .unwrap_or(false); + + // Only print the log when not major syncing to not clutter the logs. + if !is_major_syncing { + tracing::info!("✅ Successfully finalized block #{confirmed_block_number},{confirmed_block_hash}"); + } + } + Err(err) => { + tracing::warn!( + ?err, + ?finalized_number, + "Failed to finalize block #{confirmed_block_number},{confirmed_block_hash}", + ); + } + } +} diff --git a/crates/subcoin-service/src/lib.rs b/crates/subcoin-service/src/lib.rs index 90fb46d3660f5..6907d1b78dea8 100644 --- a/crates/subcoin-service/src/lib.rs +++ b/crates/subcoin-service/src/lib.rs @@ -371,6 +371,37 @@ where Ok((system_rpc_tx, sync_service)) } +/// Watch the Substrate sync status and enable the subcoin block sync when the Substate +/// state sync is finished. +pub async fn watch_substrate_fast_sync( + subcoin_network_handle: subcoin_network::NetworkHandle, + substate_sync_service: Arc>, +) { + let mut interval = tokio::time::interval(std::time::Duration::from_millis(100)); + + let mut state_sync_has_started = false; + + loop { + interval.tick().await; + + let state_sync_is_active = substate_sync_service + .status() + .await + .map(|status| status.state_sync.is_some()) + .unwrap_or(false); + + if state_sync_is_active { + if !state_sync_has_started { + state_sync_has_started = true; + } + } else if state_sync_has_started { + tracing::info!("Detected state sync is complete, starting Subcoin block sync"); + subcoin_network_handle.start_block_sync(); + return; + } + } +} + type PartialComponents = sc_service::PartialComponents< FullClient, FullBackend,