From 4b6bfb251e9d7d6254c3690b6b11c6dc031f5f79 Mon Sep 17 00:00:00 2001 From: sam elamin Date: Sun, 27 Nov 2022 19:19:56 +0000 Subject: [PATCH] wait for relay chain to sync then get parachain header --- Cargo.lock | 1 + client/consensus/common/src/lib.rs | 2 +- client/network/Cargo.toml | 1 + client/network/src/lib.rs | 73 +++++++++++++++++++++++++- parachain-template/node/src/service.rs | 14 ++++- polkadot-parachain/src/service.rs | 38 ++++++++++++-- test/service/src/lib.rs | 16 +++++- 7 files changed, 135 insertions(+), 10 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index a97bba199ea..f5f2473c291 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1622,6 +1622,7 @@ name = "cumulus-client-network" version = "0.1.0" dependencies = [ "async-trait", + "cumulus-client-consensus-common", "cumulus-primitives-core", "cumulus-relay-chain-inprocess-interface", "cumulus-relay-chain-interface", diff --git a/client/consensus/common/src/lib.rs b/client/consensus/common/src/lib.rs index d5d33585439..9f6ab7bec45 100644 --- a/client/consensus/common/src/lib.rs +++ b/client/consensus/common/src/lib.rs @@ -18,7 +18,7 @@ use polkadot_primitives::v2::{Hash as PHash, PersistedValidationData}; use sc_consensus::BlockImport; use sp_runtime::traits::Block as BlockT; -mod parachain_consensus; +pub mod parachain_consensus; #[cfg(test)] mod tests; pub use parachain_consensus::run_parachain_consensus; diff --git a/client/network/Cargo.toml b/client/network/Cargo.toml index bfd765d6fee..0a866235a60 100644 --- a/client/network/Cargo.toml +++ b/client/network/Cargo.toml @@ -28,6 +28,7 @@ polkadot-primitives = { git = "https://github.com/paritytech/polkadot", branch = # Cumulus cumulus-relay-chain-interface = { path = "../relay-chain-interface" } +cumulus-client-consensus-common = { path = "../consensus/common" } [dev-dependencies] portpicker = "0.1.1" diff --git a/client/network/src/lib.rs b/client/network/src/lib.rs index 7e906da1b1d..1a119b68415 100644 --- a/client/network/src/lib.rs +++ b/client/network/src/lib.rs @@ -35,10 +35,10 @@ use polkadot_primitives::v2::{ }; use codec::{Decode, DecodeAll, Encode}; -use futures::{channel::oneshot, future::FutureExt, Future}; +use futures::{channel::oneshot, future::FutureExt, Future, StreamExt}; +use cumulus_client_consensus_common::parachain_consensus::RelaychainClient; use std::{convert::TryFrom, fmt, marker::PhantomData, pin::Pin, sync::Arc}; - #[cfg(test)] mod tests; @@ -454,3 +454,72 @@ async fn wait_to_announce( ); } } + +#[derive(Clone)] +pub struct WaitForParachainTargetBlock { + phantom: PhantomData, +} + +impl WaitForParachainTargetBlock { + /// Get warp sync target block + pub async fn warp_sync_get( + para_id: ParaId, + relay_chain_interface: Arc, + ) -> Result, BoxedError> + where + Block: BlockT + 'static, + { + let (sender, receiver) = oneshot::channel::(); + Self::wait_for_target_block(sender, para_id, relay_chain_interface).await; + return Ok(receiver) + } + + async fn wait_for_target_block( + sender: oneshot::Sender, + para_id: ParaId, + relay_chain_interface: Arc, + ) { + let is_syncing = relay_chain_interface + .is_major_syncing() + .await + .map_err(|e| { + tracing::error!(target: LOG_TARGET, "Unable to determine sync status. {}", e) + }) + .unwrap_or(false); + + loop { + if !is_syncing { + let mut finalized_heads = match relay_chain_interface.finalized_heads(para_id).await + { + Ok(finalized_heads_stream) => finalized_heads_stream, + Err(err) => { + tracing::error!(target: LOG_TARGET, error = ?err, "Unable to retrieve finalized heads stream."); + return + }, + }; + + let finalized_head = if let Some(h) = finalized_heads.next().await { + h + } else { + tracing::debug!(target: "cumulus-network", "Stopping following finalized head."); + return + }; + + let target_header = match Block::Header::decode(&mut &finalized_head[..]) { + Ok(header) => header, + Err(err) => { + tracing::debug!( + target: "cumulus-network", + error = ?err, + "Could not decode parachain header while following finalized heads.", + ); + continue + }, + }; + + let _ = sender.send(target_header); + break + } + } + } +} diff --git a/parachain-template/node/src/service.rs b/parachain-template/node/src/service.rs index 9a3b1d9bbc8..87a6161e8ba 100644 --- a/parachain-template/node/src/service.rs +++ b/parachain-template/node/src/service.rs @@ -31,6 +31,7 @@ use sp_keystore::SyncCryptoStorePtr; use substrate_prometheus_endpoint::Registry; use polkadot_service::CollatorPair; +use sc_network_common::sync::warp::WarpSyncParams; /// Native executor type. pub struct ParachainNativeExecutor; @@ -197,6 +198,17 @@ async fn start_node_impl( let prometheus_registry = parachain_config.prometheus_registry().cloned(); let transaction_pool = params.transaction_pool.clone(); let import_queue = cumulus_client_service::SharedImportQueue::new(params.import_queue); + let warp_sync_params = + match cumulus_client_network::WaitForParachainTargetBlock::::warp_sync_get( + id, + relay_chain_interface.clone(), + ) + .await + { + Ok(target_block) => Some(WarpSyncParams::WaitForTarget(target_block)), + _ => None, + }; + let (network, system_rpc_tx, tx_handler_controller, start_network) = sc_service::build_network(sc_service::BuildNetworkParams { config: ¶chain_config, @@ -207,7 +219,7 @@ async fn start_node_impl( block_announce_validator_builder: Some(Box::new(|_| { Box::new(block_announce_validator) })), - warp_sync_params: None, + warp_sync_params, })?; if parachain_config.offchain_worker.enabled { diff --git a/polkadot-parachain/src/service.rs b/polkadot-parachain/src/service.rs index 0dc7f684023..aa19d154f03 100644 --- a/polkadot-parachain/src/service.rs +++ b/polkadot-parachain/src/service.rs @@ -47,7 +47,7 @@ use sc_consensus::{ }; use sc_executor::WasmExecutor; use sc_network::NetworkService; -use sc_network_common::service::NetworkBlock; +use sc_network_common::{service::NetworkBlock, sync::warp::WarpSyncParams}; use sc_service::{Configuration, PartialComponents, TFullBackend, TFullClient, TaskManager}; use sc_telemetry::{Telemetry, TelemetryHandle, TelemetryWorker, TelemetryWorkerHandle}; use sp_api::{ApiExt, ConstructRuntimeApi}; @@ -375,6 +375,16 @@ where let prometheus_registry = parachain_config.prometheus_registry().cloned(); let transaction_pool = params.transaction_pool.clone(); let import_queue = cumulus_client_service::SharedImportQueue::new(params.import_queue); + let warp_sync_params = + match cumulus_client_network::WaitForParachainTargetBlock::::warp_sync_get( + para_id, + relay_chain_interface.clone(), + ) + .await + { + Ok(target_block) => Some(WarpSyncParams::WaitForTarget(target_block)), + _ => None, + }; let (network, system_rpc_tx, tx_handler_controller, start_network) = sc_service::build_network(sc_service::BuildNetworkParams { config: ¶chain_config, @@ -385,7 +395,7 @@ where block_announce_validator_builder: Some(Box::new(|_| { Box::new(block_announce_validator) })), - warp_sync_params: None, + warp_sync_params, })?; let rpc_client = client.clone(); @@ -558,6 +568,16 @@ where let prometheus_registry = parachain_config.prometheus_registry().cloned(); let transaction_pool = params.transaction_pool.clone(); let import_queue = cumulus_client_service::SharedImportQueue::new(params.import_queue); + let warp_sync_params = + match cumulus_client_network::WaitForParachainTargetBlock::::warp_sync_get( + para_id, + relay_chain_interface.clone(), + ) + .await + { + Ok(target_block) => Some(WarpSyncParams::WaitForTarget(target_block)), + _ => None, + }; let (network, system_rpc_tx, tx_handler_controller, start_network) = sc_service::build_network(sc_service::BuildNetworkParams { config: ¶chain_config, @@ -568,7 +588,7 @@ where block_announce_validator_builder: Some(Box::new(|_| { Box::new(block_announce_validator) })), - warp_sync_params: None, + warp_sync_params, })?; let rpc_builder = { @@ -1327,6 +1347,16 @@ where let prometheus_registry = parachain_config.prometheus_registry().cloned(); let transaction_pool = params.transaction_pool.clone(); let import_queue = cumulus_client_service::SharedImportQueue::new(params.import_queue); + let warp_sync_params = + match cumulus_client_network::WaitForParachainTargetBlock::::warp_sync_get( + para_id, + relay_chain_interface.clone(), + ) + .await + { + Ok(target_block) => Some(WarpSyncParams::WaitForTarget(target_block)), + _ => None, + }; let (network, system_rpc_tx, tx_handler_controller, start_network) = sc_service::build_network(sc_service::BuildNetworkParams { config: ¶chain_config, @@ -1337,7 +1367,7 @@ where block_announce_validator_builder: Some(Box::new(|_| { Box::new(block_announce_validator) })), - warp_sync_params: None, + warp_sync_params, })?; let rpc_builder = { diff --git a/test/service/src/lib.rs b/test/service/src/lib.rs index e36c73215d5..48b99ee21df 100644 --- a/test/service/src/lib.rs +++ b/test/service/src/lib.rs @@ -49,7 +49,9 @@ use polkadot_primitives::v2::{CollatorPair, Hash as PHash, PersistedValidationDa use polkadot_service::ProvideRuntimeApi; use sc_client_api::execution_extensions::ExecutionStrategies; use sc_network::{multiaddr, NetworkBlock, NetworkService}; -use sc_network_common::{config::TransportConfig, service::NetworkStateInfo}; +use sc_network_common::{ + config::TransportConfig, service::NetworkStateInfo, sync::warp::WarpSyncParams, +}; use sc_service::{ config::{ BlocksPruning, DatabaseSource, KeystoreConfig, MultiaddrWithPeerId, NetworkConfiguration, @@ -268,6 +270,16 @@ where BlockAnnounceValidator::new(relay_chain_interface.clone(), para_id); let block_announce_validator_builder = move |_| Box::new(block_announce_validator) as Box<_>; + let warp_sync_params = + match cumulus_client_network::WaitForParachainTargetBlock::::warp_sync_get( + para_id, + relay_chain_interface.clone(), + ) + .await + { + Ok(target_block) => Some(WarpSyncParams::WaitForTarget(target_block)), + _ => None, + }; let prometheus_registry = parachain_config.prometheus_registry().cloned(); let import_queue = cumulus_client_service::SharedImportQueue::new(params.import_queue); let (network, system_rpc_tx, tx_handler_controller, start_network) = @@ -278,7 +290,7 @@ where spawn_handle: task_manager.spawn_handle(), import_queue: import_queue.clone(), block_announce_validator_builder: Some(Box::new(block_announce_validator_builder)), - warp_sync_params: None, + warp_sync_params, })?; let rpc_builder = {