Skip to content
This repository has been archived by the owner on Nov 15, 2023. It is now read-only.

Commit

Permalink
wait for relay chain to sync then get parachain header
Browse files Browse the repository at this point in the history
  • Loading branch information
samelamin committed Nov 27, 2022
1 parent dd25357 commit 4b6bfb2
Show file tree
Hide file tree
Showing 7 changed files with 135 additions and 10 deletions.
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion client/consensus/common/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ use polkadot_primitives::v2::{Hash as PHash, PersistedValidationData};
use sc_consensus::BlockImport;
use sp_runtime::traits::Block as BlockT;

mod parachain_consensus;
pub mod parachain_consensus;
#[cfg(test)]
mod tests;
pub use parachain_consensus::run_parachain_consensus;
Expand Down
1 change: 1 addition & 0 deletions client/network/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ polkadot-primitives = { git = "https://github.com/paritytech/polkadot", branch =

# Cumulus
cumulus-relay-chain-interface = { path = "../relay-chain-interface" }
cumulus-client-consensus-common = { path = "../consensus/common" }

[dev-dependencies]
portpicker = "0.1.1"
Expand Down
73 changes: 71 additions & 2 deletions client/network/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -35,10 +35,10 @@ use polkadot_primitives::v2::{
};

use codec::{Decode, DecodeAll, Encode};
use futures::{channel::oneshot, future::FutureExt, Future};
use futures::{channel::oneshot, future::FutureExt, Future, StreamExt};

use cumulus_client_consensus_common::parachain_consensus::RelaychainClient;
use std::{convert::TryFrom, fmt, marker::PhantomData, pin::Pin, sync::Arc};

#[cfg(test)]
mod tests;

Expand Down Expand Up @@ -454,3 +454,72 @@ async fn wait_to_announce<Block: BlockT>(
);
}
}

#[derive(Clone)]
pub struct WaitForParachainTargetBlock<Block> {
phantom: PhantomData<Block>,
}

impl<Block: BlockT> WaitForParachainTargetBlock<Block> {
/// Get warp sync target block
pub async fn warp_sync_get(
para_id: ParaId,
relay_chain_interface: Arc<dyn RelayChainInterface>,
) -> Result<oneshot::Receiver<Block::Header>, BoxedError>
where
Block: BlockT + 'static,
{
let (sender, receiver) = oneshot::channel::<Block::Header>();
Self::wait_for_target_block(sender, para_id, relay_chain_interface).await;
return Ok(receiver)
}

async fn wait_for_target_block(
sender: oneshot::Sender<Block::Header>,
para_id: ParaId,
relay_chain_interface: Arc<dyn RelayChainInterface>,
) {
let is_syncing = relay_chain_interface
.is_major_syncing()
.await
.map_err(|e| {
tracing::error!(target: LOG_TARGET, "Unable to determine sync status. {}", e)
})
.unwrap_or(false);

loop {
if !is_syncing {
let mut finalized_heads = match relay_chain_interface.finalized_heads(para_id).await
{
Ok(finalized_heads_stream) => finalized_heads_stream,
Err(err) => {
tracing::error!(target: LOG_TARGET, error = ?err, "Unable to retrieve finalized heads stream.");
return
},
};

let finalized_head = if let Some(h) = finalized_heads.next().await {
h
} else {
tracing::debug!(target: "cumulus-network", "Stopping following finalized head.");
return
};

let target_header = match Block::Header::decode(&mut &finalized_head[..]) {
Ok(header) => header,
Err(err) => {
tracing::debug!(
target: "cumulus-network",
error = ?err,
"Could not decode parachain header while following finalized heads.",
);
continue
},
};

let _ = sender.send(target_header);
break
}
}
}
}
14 changes: 13 additions & 1 deletion parachain-template/node/src/service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ use sp_keystore::SyncCryptoStorePtr;
use substrate_prometheus_endpoint::Registry;

use polkadot_service::CollatorPair;
use sc_network_common::sync::warp::WarpSyncParams;

/// Native executor type.
pub struct ParachainNativeExecutor;
Expand Down Expand Up @@ -197,6 +198,17 @@ async fn start_node_impl(
let prometheus_registry = parachain_config.prometheus_registry().cloned();
let transaction_pool = params.transaction_pool.clone();
let import_queue = cumulus_client_service::SharedImportQueue::new(params.import_queue);
let warp_sync_params =
match cumulus_client_network::WaitForParachainTargetBlock::<Block>::warp_sync_get(
id,
relay_chain_interface.clone(),
)
.await
{
Ok(target_block) => Some(WarpSyncParams::WaitForTarget(target_block)),
_ => None,
};

let (network, system_rpc_tx, tx_handler_controller, start_network) =
sc_service::build_network(sc_service::BuildNetworkParams {
config: &parachain_config,
Expand All @@ -207,7 +219,7 @@ async fn start_node_impl(
block_announce_validator_builder: Some(Box::new(|_| {
Box::new(block_announce_validator)
})),
warp_sync_params: None,
warp_sync_params,
})?;

if parachain_config.offchain_worker.enabled {
Expand Down
38 changes: 34 additions & 4 deletions polkadot-parachain/src/service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ use sc_consensus::{
};
use sc_executor::WasmExecutor;
use sc_network::NetworkService;
use sc_network_common::service::NetworkBlock;
use sc_network_common::{service::NetworkBlock, sync::warp::WarpSyncParams};
use sc_service::{Configuration, PartialComponents, TFullBackend, TFullClient, TaskManager};
use sc_telemetry::{Telemetry, TelemetryHandle, TelemetryWorker, TelemetryWorkerHandle};
use sp_api::{ApiExt, ConstructRuntimeApi};
Expand Down Expand Up @@ -375,6 +375,16 @@ where
let prometheus_registry = parachain_config.prometheus_registry().cloned();
let transaction_pool = params.transaction_pool.clone();
let import_queue = cumulus_client_service::SharedImportQueue::new(params.import_queue);
let warp_sync_params =
match cumulus_client_network::WaitForParachainTargetBlock::<Block>::warp_sync_get(
para_id,
relay_chain_interface.clone(),
)
.await
{
Ok(target_block) => Some(WarpSyncParams::WaitForTarget(target_block)),
_ => None,
};
let (network, system_rpc_tx, tx_handler_controller, start_network) =
sc_service::build_network(sc_service::BuildNetworkParams {
config: &parachain_config,
Expand All @@ -385,7 +395,7 @@ where
block_announce_validator_builder: Some(Box::new(|_| {
Box::new(block_announce_validator)
})),
warp_sync_params: None,
warp_sync_params,
})?;

let rpc_client = client.clone();
Expand Down Expand Up @@ -558,6 +568,16 @@ where
let prometheus_registry = parachain_config.prometheus_registry().cloned();
let transaction_pool = params.transaction_pool.clone();
let import_queue = cumulus_client_service::SharedImportQueue::new(params.import_queue);
let warp_sync_params =
match cumulus_client_network::WaitForParachainTargetBlock::<Block>::warp_sync_get(
para_id,
relay_chain_interface.clone(),
)
.await
{
Ok(target_block) => Some(WarpSyncParams::WaitForTarget(target_block)),
_ => None,
};
let (network, system_rpc_tx, tx_handler_controller, start_network) =
sc_service::build_network(sc_service::BuildNetworkParams {
config: &parachain_config,
Expand All @@ -568,7 +588,7 @@ where
block_announce_validator_builder: Some(Box::new(|_| {
Box::new(block_announce_validator)
})),
warp_sync_params: None,
warp_sync_params,
})?;

let rpc_builder = {
Expand Down Expand Up @@ -1327,6 +1347,16 @@ where
let prometheus_registry = parachain_config.prometheus_registry().cloned();
let transaction_pool = params.transaction_pool.clone();
let import_queue = cumulus_client_service::SharedImportQueue::new(params.import_queue);
let warp_sync_params =
match cumulus_client_network::WaitForParachainTargetBlock::<Block>::warp_sync_get(
para_id,
relay_chain_interface.clone(),
)
.await
{
Ok(target_block) => Some(WarpSyncParams::WaitForTarget(target_block)),
_ => None,
};
let (network, system_rpc_tx, tx_handler_controller, start_network) =
sc_service::build_network(sc_service::BuildNetworkParams {
config: &parachain_config,
Expand All @@ -1337,7 +1367,7 @@ where
block_announce_validator_builder: Some(Box::new(|_| {
Box::new(block_announce_validator)
})),
warp_sync_params: None,
warp_sync_params,
})?;

let rpc_builder = {
Expand Down
16 changes: 14 additions & 2 deletions test/service/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,9 @@ use polkadot_primitives::v2::{CollatorPair, Hash as PHash, PersistedValidationDa
use polkadot_service::ProvideRuntimeApi;
use sc_client_api::execution_extensions::ExecutionStrategies;
use sc_network::{multiaddr, NetworkBlock, NetworkService};
use sc_network_common::{config::TransportConfig, service::NetworkStateInfo};
use sc_network_common::{
config::TransportConfig, service::NetworkStateInfo, sync::warp::WarpSyncParams,
};
use sc_service::{
config::{
BlocksPruning, DatabaseSource, KeystoreConfig, MultiaddrWithPeerId, NetworkConfiguration,
Expand Down Expand Up @@ -268,6 +270,16 @@ where
BlockAnnounceValidator::new(relay_chain_interface.clone(), para_id);
let block_announce_validator_builder = move |_| Box::new(block_announce_validator) as Box<_>;

let warp_sync_params =
match cumulus_client_network::WaitForParachainTargetBlock::<Block>::warp_sync_get(
para_id,
relay_chain_interface.clone(),
)
.await
{
Ok(target_block) => Some(WarpSyncParams::WaitForTarget(target_block)),
_ => None,
};
let prometheus_registry = parachain_config.prometheus_registry().cloned();
let import_queue = cumulus_client_service::SharedImportQueue::new(params.import_queue);
let (network, system_rpc_tx, tx_handler_controller, start_network) =
Expand All @@ -278,7 +290,7 @@ where
spawn_handle: task_manager.spawn_handle(),
import_queue: import_queue.clone(),
block_announce_validator_builder: Some(Box::new(block_announce_validator_builder)),
warp_sync_params: None,
warp_sync_params,
})?;

let rpc_builder = {
Expand Down

0 comments on commit 4b6bfb2

Please sign in to comment.