Skip to content

Commit

Permalink
Ensure only one finalize block task at the same time (paritytech#46)
Browse files Browse the repository at this point in the history
* Ensure no multiple finalize-block tasks at the same time

* Remove major_sync_confirmation_depth

* Start subcoin networking after the start of substrate networking

* Disable subcoin block sync when substrate fast sync is on

* Spawn a task for starting subcoin block sync when substrate state sync is complete

* Start subcoin block sync

* Fix clippy
  • Loading branch information
liuchengxu authored Aug 24, 2024
1 parent d1e7f42 commit 0ff2928
Show file tree
Hide file tree
Showing 9 changed files with 227 additions and 77 deletions.
2 changes: 2 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

17 changes: 17 additions & 0 deletions crates/subcoin-network/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -226,6 +226,8 @@ enum NetworkWorkerMessage {
GetTransaction((Txid, oneshot::Sender<Option<Transaction>>)),
/// Add transaction to the transaction manager.
SendTransaction((IncomingTransaction, oneshot::Sender<SendTransactionResult>)),
/// Enable the block sync in the chain sync component.
StartBlockSync,
}

/// A handle for interacting with the network worker.
Expand Down Expand Up @@ -306,6 +308,12 @@ impl NetworkHandle {
.unwrap_or(SendTransactionResult::Failure("Internal error".to_string()))
}

pub fn start_block_sync(&self) -> bool {
self.worker_msg_sender
.unbounded_send(NetworkWorkerMessage::StartBlockSync)
.is_ok()
}

/// Returns a flag indicating whether the node is actively performing a major sync.
pub fn is_major_syncing(&self) -> Arc<AtomicBool> {
self.is_major_syncing.clone()
Expand All @@ -330,6 +338,8 @@ pub struct Params {
pub max_inbound_peers: usize,
/// Major sync strategy.
pub sync_strategy: SyncStrategy,
/// Whether the Substrate fast sync is enabled.
pub substrate_fast_sync_enabled: bool,
}

fn builtin_seednodes(network: BitcoinNetwork) -> &'static [&'static str] {
Expand Down Expand Up @@ -449,6 +459,12 @@ where
params.ipv4_only,
);

let enable_block_sync = !params.substrate_fast_sync_enabled;

if !enable_block_sync {
tracing::info!("Subcoin block sync is disabled until Substrate fast sync is complete");
}

let network_worker = NetworkWorker::new(
worker::Params {
client: client.clone(),
Expand All @@ -458,6 +474,7 @@ where
is_major_syncing,
connection_initiator: connection_initiator.clone(),
max_outbound_peers: params.max_outbound_peers,
enable_block_sync,
},
registry.as_ref(),
);
Expand Down
16 changes: 16 additions & 0 deletions crates/subcoin-network/src/sync.rs
Original file line number Diff line number Diff line change
Expand Up @@ -152,8 +152,13 @@ pub(crate) struct ChainSync<Block, Client> {
syncing: Syncing<Block, Client>,
/// Handle of the import queue.
import_queue: BlockImportQueue,
/// Block syncing strategy.
sync_strategy: SyncStrategy,
/// Are we in major syncing?
is_major_syncing: Arc<AtomicBool>,
/// Whether to sync blocks from Bitcoin network.
enable_block_sync: bool,
/// Randomness generator.
rng: fastrand::Rng,
_phantom: PhantomData<Block>,
}
Expand All @@ -169,6 +174,7 @@ where
import_queue: BlockImportQueue,
sync_strategy: SyncStrategy,
is_major_syncing: Arc<AtomicBool>,
enable_block_sync: bool,
) -> Self {
Self {
client,
Expand All @@ -177,6 +183,7 @@ where
syncing: Syncing::Idle,
sync_strategy,
is_major_syncing,
enable_block_sync,
rng: fastrand::Rng::new(),
_phantom: Default::default(),
}
Expand Down Expand Up @@ -344,6 +351,15 @@ where

self.peers.insert(peer_id, new_peer);

if self.enable_block_sync {
self.attempt_sync_start()
} else {
SyncAction::None
}
}

pub(super) fn start_block_sync(&mut self) -> SyncAction {
self.enable_block_sync = true;
self.attempt_sync_start()
}

Expand Down
17 changes: 16 additions & 1 deletion crates/subcoin-network/src/worker.rs
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,8 @@ pub struct Params<Client> {
pub is_major_syncing: Arc<AtomicBool>,
pub connection_initiator: ConnectionInitiator,
pub max_outbound_peers: usize,
/// Whether to enable block sync on start.
pub enable_block_sync: bool,
}

/// [`NetworkWorker`] is responsible for processing the network events.
Expand Down Expand Up @@ -82,6 +84,7 @@ where
is_major_syncing,
connection_initiator,
max_outbound_peers,
enable_block_sync,
} = params;

let config = Config::new();
Expand All @@ -101,11 +104,19 @@ where
metrics.clone(),
);

let chain_sync = ChainSync::new(
client,
import_queue,
sync_strategy,
is_major_syncing,
enable_block_sync,
);

Self {
network_event_receiver,
peer_manager,
transaction_manager: TransactionManager::new(),
chain_sync: ChainSync::new(client, import_queue, sync_strategy, is_major_syncing),
chain_sync,
metrics,
config,
}
Expand Down Expand Up @@ -262,6 +273,10 @@ where
};
let _ = result_sender.send(send_transaction_result);
}
NetworkWorkerMessage::StartBlockSync => {
let sync_action = self.chain_sync.start_block_sync();
self.do_sync_action(sync_action);
}
}
}

Expand Down
1 change: 0 additions & 1 deletion crates/subcoin-node/src/cli.rs
Original file line number Diff line number Diff line change
Expand Up @@ -131,7 +131,6 @@ pub fn run() -> sc_cli::Result<()> {
client,
spawn_handle,
CONFIRMATION_DEPTH,
100,
is_major_syncing,
None,
)
Expand Down
77 changes: 44 additions & 33 deletions crates/subcoin-node/src/commands/run.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ use crate::cli::params::{CommonParams, NetworkParams};
use clap::Parser;
use sc_cli::{
ImportParams, NetworkParams as SubstrateNetworkParams, NodeKeyParams, PrometheusParams, Role,
SharedParams,
SharedParams, SyncMode,
};
use sc_client_api::UsageProvider;
use sc_consensus_nakamoto::BitcoinBlockImporter;
Expand All @@ -18,13 +18,6 @@ pub struct Run {
#[clap(long, default_value = "headers-first")]
pub sync_strategy: SyncStrategy,

/// Specify the confirmation depth during the major sync.
///
/// If you encounter a high memory usage when the node is major syncing, try to
/// specify a smaller number.
#[clap(long, default_value = "100")]
pub major_sync_confirmation_depth: u32,

/// Do not run the finalizer which will finalize the blocks on confirmation depth.
#[clap(long)]
pub no_finalizer: bool,
Expand Down Expand Up @@ -56,6 +49,10 @@ pub struct Run {

impl Run {
pub fn subcoin_network_params(&self, network: bitcoin::Network) -> subcoin_network::Params {
let substrate_fast_sync_enabled = matches!(
self.substrate_network_params.sync,
SyncMode::Fast | SyncMode::FastUnsafe
);
subcoin_network::Params {
network,
listen_on: self.network_params.listen,
Expand All @@ -65,6 +62,7 @@ impl Run {
max_outbound_peers: self.network_params.max_outbound_peers,
max_inbound_peers: self.network_params.max_inbound_peers,
sync_strategy: self.sync_strategy,
substrate_fast_sync_enabled,
}
}
}
Expand Down Expand Up @@ -100,7 +98,6 @@ impl RunCmd {
let bitcoin_network = run.common_params.bitcoin_network();
let import_config = run.common_params.import_config();
let no_finalizer = run.no_finalizer;
let major_sync_confirmation_depth = run.major_sync_confirmation_depth;

let subcoin_service::NodeComponents {
client,
Expand Down Expand Up @@ -138,29 +135,6 @@ impl RunCmd {
bitcoin_block_import,
);

let (subcoin_networking, subcoin_network_handle) = subcoin_network::Network::new(
client.clone(),
run.subcoin_network_params(bitcoin_network),
import_queue,
spawn_handle.clone(),
config.prometheus_registry().cloned(),
);

// TODO: handle Substrate networking and Bitcoin networking properly.
if !run.disable_subcoin_networking {
task_manager.spawn_essential_handle().spawn_blocking(
"subcoin-networking",
None,
async move {
if let Err(err) = subcoin_networking.run().await {
tracing::error!(?err, "Error occurred in subcoin networking");
}
},
);
} else {
task_manager.keep_alive(subcoin_networking);
}

let (system_rpc_tx, substrate_sync_service) = match config.network.network_backend {
sc_network::config::NetworkBackendType::Libp2p => {
subcoin_service::start_substrate_network::<
Expand Down Expand Up @@ -189,6 +163,44 @@ impl RunCmd {
}
};

let subcoin_network_params = run.subcoin_network_params(bitcoin_network);

let substrate_fast_sync_enabled = subcoin_network_params.substrate_fast_sync_enabled;

let (subcoin_networking, subcoin_network_handle) = subcoin_network::Network::new(
client.clone(),
subcoin_network_params,
import_queue,
spawn_handle.clone(),
config.prometheus_registry().cloned(),
);

// TODO: handle Substrate networking and Bitcoin networking properly.
if !run.disable_subcoin_networking {
task_manager.spawn_essential_handle().spawn_blocking(
"subcoin-networking",
None,
async move {
if let Err(err) = subcoin_networking.run().await {
tracing::error!(?err, "Error occurred in subcoin networking");
}
},
);

if substrate_fast_sync_enabled {
task_manager.spawn_handle().spawn(
"substrate-fast-sync-watcher",
None,
subcoin_service::watch_substrate_fast_sync(
subcoin_network_handle.clone(),
substrate_sync_service.clone(),
),
);
}
} else {
task_manager.keep_alive(subcoin_networking);
}

// TODO: Bitcoin-compatible RPC
// Start JSON-RPC server.
let gen_rpc_module = |deny_unsafe: sc_rpc::DenyUnsafe| {
Expand Down Expand Up @@ -220,7 +232,6 @@ impl RunCmd {
client.clone(),
spawn_handle.clone(),
CONFIRMATION_DEPTH,
major_sync_confirmation_depth,
subcoin_network_handle.is_major_syncing(),
Some(substrate_sync_service),
)
Expand Down
2 changes: 2 additions & 0 deletions crates/subcoin-service/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ frame-system = { workspace = true }
futures = { workspace = true }
jsonrpsee = { workspace = true }
pallet-bitcoin = { workspace = true }
parking_lot = { workspace = true }
sc-client-api = { workspace = true }
sc-consensus = { workspace = true }
sc-consensus-nakamoto = { workspace = true }
Expand Down Expand Up @@ -43,6 +44,7 @@ sp-runtime = { workspace = true }
sp-state-machine = { workspace = true }
sp-storage = { workspace = true }
sp-trie = { workspace = true }
subcoin-network = { workspace = true }
subcoin-primitives = { workspace = true }
subcoin-runtime = { workspace = true }
substrate-frame-rpc-system = { workspace = true }
Expand Down
Loading

0 comments on commit 0ff2928

Please sign in to comment.