Skip to content

Commit

Permalink
Remove the need to wait for target block header in warp sync implemen…
Browse files Browse the repository at this point in the history
…tation (#5431)

I'm not sure if this is exactly what
#3537 meant, but I
think it should be fine to wait for relay chain before initializing
parachain node fully, which removed the need for background task and
extra hacks throughout the stack just to know where warp sync should
start.

Previously there were both `WarpSyncParams` and `WarpSyncConfig`, but
there was no longer any point in having two data structures, so I
simplified it to just `WarpSyncConfig`.

Fixes #3537
  • Loading branch information
nazar-pc authored Aug 23, 2024
1 parent b2ec017 commit 6d819a6
Show file tree
Hide file tree
Showing 15 changed files with 107 additions and 304 deletions.
91 changes: 32 additions & 59 deletions cumulus/client/service/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,10 +28,7 @@ use cumulus_relay_chain_interface::{RelayChainInterface, RelayChainResult};
use cumulus_relay_chain_minimal_node::{
build_minimal_relay_chain_node_light_client, build_minimal_relay_chain_node_with_rpc,
};
use futures::{
channel::{mpsc, oneshot},
FutureExt, StreamExt,
};
use futures::{channel::mpsc, StreamExt};
use polkadot_primitives::{CollatorPair, OccupiedCoreAssumption};
use sc_client_api::{
Backend as BackendT, BlockBackend, BlockchainEvents, Finalizer, ProofProvider, UsageProvider,
Expand All @@ -43,7 +40,7 @@ use sc_consensus::{
use sc_network::{config::SyncMode, service::traits::NetworkService, NetworkBackend};
use sc_network_sync::SyncingService;
use sc_network_transactions::TransactionsHandlerController;
use sc_service::{Configuration, NetworkStarter, SpawnTaskHandle, TaskManager, WarpSyncParams};
use sc_service::{Configuration, NetworkStarter, SpawnTaskHandle, TaskManager, WarpSyncConfig};
use sc_telemetry::{log, TelemetryWorkerHandle};
use sc_utils::mpsc::TracingUnboundedSender;
use sp_api::ProvideRuntimeApi;
Expand Down Expand Up @@ -467,12 +464,19 @@ where
{
let warp_sync_params = match parachain_config.network.sync_mode {
SyncMode::Warp => {
let target_block = warp_sync_get::<Block, RCInterface>(
para_id,
relay_chain_interface.clone(),
spawn_handle.clone(),
);
Some(WarpSyncParams::WaitForTarget(target_block))
log::debug!(target: LOG_TARGET_SYNC, "waiting for announce block...");

let target_block =
wait_for_finalized_para_head::<Block, _>(para_id, relay_chain_interface.clone())
.await
.inspect_err(|e| {
log::error!(
target: LOG_TARGET_SYNC,
"Unable to determine parachain target block {:?}",
e
);
})?;
Some(WarpSyncConfig::WithTarget(target_block))
},
_ => None,
};
Expand Down Expand Up @@ -500,67 +504,37 @@ where
spawn_handle,
import_queue,
block_announce_validator_builder: Some(Box::new(move |_| block_announce_validator)),
warp_sync_params,
warp_sync_config: warp_sync_params,
block_relay: None,
metrics,
})
}

/// Creates a new background task to wait for the relay chain to sync up and retrieve the parachain
/// header
fn warp_sync_get<B, RCInterface>(
para_id: ParaId,
relay_chain_interface: RCInterface,
spawner: SpawnTaskHandle,
) -> oneshot::Receiver<<B as BlockT>::Header>
where
B: BlockT + 'static,
RCInterface: RelayChainInterface + 'static,
{
let (sender, receiver) = oneshot::channel::<B::Header>();
spawner.spawn(
"cumulus-parachain-wait-for-target-block",
None,
async move {
log::debug!(
target: LOG_TARGET_SYNC,
"waiting for announce block in a background task...",
);

let _ = wait_for_finalized_para_head::<B, _>(sender, para_id, relay_chain_interface)
.await
.map_err(|e| {
log::error!(
target: LOG_TARGET_SYNC,
"Unable to determine parachain target block {:?}",
e
)
});
}
.boxed(),
);

receiver
}

/// Waits for the relay chain to have finished syncing and then gets the parachain header that
/// corresponds to the last finalized relay chain block.
async fn wait_for_finalized_para_head<B, RCInterface>(
sender: oneshot::Sender<<B as BlockT>::Header>,
para_id: ParaId,
relay_chain_interface: RCInterface,
) -> Result<(), Box<dyn std::error::Error + Send + Sync>>
) -> sc_service::error::Result<<B as BlockT>::Header>
where
B: BlockT + 'static,
RCInterface: RelayChainInterface + Send + 'static,
{
let mut imported_blocks = relay_chain_interface.import_notification_stream().await?.fuse();
while imported_blocks.next().await.is_some() {
let is_syncing = relay_chain_interface.is_major_syncing().await.map_err(|e| {
Box::<dyn std::error::Error + Send + Sync>::from(format!(
"Unable to determine sync status. {e}"
let mut imported_blocks = relay_chain_interface
.import_notification_stream()
.await
.map_err(|error| {
sc_service::Error::Other(format!(
"Relay chain import notification stream error when waiting for parachain head: \
{error}"
))
})?;
})?
.fuse();
while imported_blocks.next().await.is_some() {
let is_syncing = relay_chain_interface
.is_major_syncing()
.await
.map_err(|e| format!("Unable to determine sync status: {e}"))?;

if !is_syncing {
let relay_chain_best_hash = relay_chain_interface
Expand All @@ -586,8 +560,7 @@ where
finalized_header.number(),
finalized_header.hash()
);
let _ = sender.send(finalized_header);
return Ok(())
return Ok(finalized_header)
}
}

Expand Down
4 changes: 2 additions & 2 deletions polkadot/node/service/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -764,7 +764,7 @@ pub fn new_full<
) -> Result<NewFull, Error> {
use polkadot_availability_recovery::FETCH_CHUNKS_THRESHOLD;
use polkadot_node_network_protocol::request_response::IncomingRequest;
use sc_network_sync::WarpSyncParams;
use sc_network_sync::WarpSyncConfig;

let is_offchain_indexing_enabled = config.offchain_worker.indexing_enabled;
let role = config.role.clone();
Expand Down Expand Up @@ -1037,7 +1037,7 @@ pub fn new_full<
spawn_handle: task_manager.spawn_handle(),
import_queue,
block_announce_validator_builder: None,
warp_sync_params: Some(WarpSyncParams::WithProvider(warp_sync)),
warp_sync_config: Some(WarpSyncConfig::WithProvider(warp_sync)),
block_relay: None,
metrics,
})?;
Expand Down
20 changes: 20 additions & 0 deletions prdoc/pr_5431.prdoc
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
title: Remove the need to wait for target block header in warp sync implementation

doc:
- audience: Node Dev
description: |
Previously warp sync needed to wait for target block header of the relay chain to become available before warp
sync can start, which resulted in cumbersome APIs. Parachain initialization was refactored to initialize warp sync
with target block header from the very beginning, improving and simplifying sync API.

crates:
- name: sc-service
bump: major
- name: sc-network-sync
bump: major
- name: polkadot-service
bump: major
- name: cumulus-client-service
bump: major
- name: sc-informant
bump: major
4 changes: 2 additions & 2 deletions substrate/bin/node/cli/src/service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ use sc_consensus_babe::{self, SlotProportion};
use sc_network::{
event::Event, service::traits::NetworkService, NetworkBackend, NetworkEventStream,
};
use sc_network_sync::{strategy::warp::WarpSyncParams, SyncingService};
use sc_network_sync::{strategy::warp::WarpSyncConfig, SyncingService};
use sc_service::{config::Configuration, error::Error as ServiceError, RpcHandlers, TaskManager};
use sc_statement_store::Store as StatementStore;
use sc_telemetry::{Telemetry, TelemetryWorker};
Expand Down Expand Up @@ -517,7 +517,7 @@ pub fn new_full_base<N: NetworkBackend<Block, <Block as BlockT>::Hash>>(
spawn_handle: task_manager.spawn_handle(),
import_queue,
block_announce_validator_builder: None,
warp_sync_params: Some(WarpSyncParams::WithProvider(warp_sync)),
warp_sync_config: Some(WarpSyncConfig::WithProvider(warp_sync)),
block_relay: None,
metrics,
})?;
Expand Down
10 changes: 1 addition & 9 deletions substrate/client/informant/src/display.rs
Original file line number Diff line number Diff line change
Expand Up @@ -101,17 +101,9 @@ impl<B: BlockT> InformantDisplay<B> {
_,
Some(WarpSyncProgress { phase: WarpSyncPhase::DownloadingBlocks(n), .. }),
) if !sync_status.is_major_syncing() => ("⏩", "Block history".into(), format!(", #{}", n)),
(
_,
_,
Some(WarpSyncProgress { phase: WarpSyncPhase::AwaitingTargetBlock, .. }),
) => ("⏩", "Waiting for pending target block".into(), "".into()),
// Handle all phases besides the two phases we already handle above.
(_, _, Some(warp))
if !matches!(
warp.phase,
WarpSyncPhase::AwaitingTargetBlock | WarpSyncPhase::DownloadingBlocks(_)
) =>
if !matches!(warp.phase, WarpSyncPhase::DownloadingBlocks(_)) =>
(
"⏩",
"Warping".into(),
Expand Down
55 changes: 3 additions & 52 deletions substrate/client/network/sync/src/engine.rs
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ use crate::{
syncing_service::{SyncingService, ToServiceCommand},
},
strategy::{
warp::{EncodedProof, WarpProofRequest, WarpSyncParams},
warp::{EncodedProof, WarpProofRequest, WarpSyncConfig},
StrategyKey, SyncingAction, SyncingConfig, SyncingStrategy,
},
types::{
Expand All @@ -42,11 +42,7 @@ use crate::{
};

use codec::{Decode, DecodeAll, Encode};
use futures::{
channel::oneshot,
future::{BoxFuture, Fuse},
FutureExt, StreamExt,
};
use futures::{channel::oneshot, FutureExt, StreamExt};
use libp2p::request_response::OutboundFailure;
use log::{debug, error, trace, warn};
use prometheus_endpoint::{
Expand Down Expand Up @@ -257,10 +253,6 @@ pub struct SyncingEngine<B: BlockT, Client> {
/// The `PeerId`'s of all boot nodes.
boot_node_ids: HashSet<PeerId>,

/// A channel to get target block header if we skip over proofs downloading during warp sync.
warp_sync_target_block_header_rx_fused:
Fuse<BoxFuture<'static, Result<B::Header, oneshot::Canceled>>>,

/// Protocol name used for block announcements
block_announce_protocol_name: ProtocolName,

Expand Down Expand Up @@ -309,7 +301,7 @@ where
protocol_id: ProtocolId,
fork_id: &Option<String>,
block_announce_validator: Box<dyn BlockAnnounceValidator<B> + Send>,
warp_sync_params: Option<WarpSyncParams<B>>,
warp_sync_config: Option<WarpSyncConfig<B>>,
network_service: service::network::NetworkServiceHandle,
import_queue: Box<dyn ImportQueueService<B>>,
block_downloader: Arc<dyn BlockDownloader<B>>,
Expand Down Expand Up @@ -404,19 +396,6 @@ where
Arc::clone(&peer_store_handle),
);

// Split warp sync params into warp sync config and a channel to retrieve target block
// header.
let (warp_sync_config, warp_sync_target_block_header_rx) =
warp_sync_params.map_or((None, None), |params| {
let (config, target_block_rx) = params.split();
(Some(config), target_block_rx)
});

// Make sure polling of the target block channel is a no-op if there is no block to
// retrieve.
let warp_sync_target_block_header_rx_fused = warp_sync_target_block_header_rx
.map_or(futures::future::pending().boxed().fuse(), |rx| rx.boxed().fuse());

// Initialize syncing strategy.
let strategy = SyncingStrategy::new(syncing_config, client.clone(), warp_sync_config)?;

Expand Down Expand Up @@ -460,7 +439,6 @@ where
genesis_hash,
important_peers,
default_peers_set_no_slot_connected_peers: HashSet::new(),
warp_sync_target_block_header_rx_fused,
boot_node_ids,
default_peers_set_no_slot_peers,
default_peers_set_num_full,
Expand Down Expand Up @@ -635,17 +613,6 @@ where
Some(event) => self.process_notification_event(event),
None => return,
},
// TODO: setting of warp sync target block should be moved to the initialization of
// `SyncingEngine`, see https://github.com/paritytech/polkadot-sdk/issues/3537.
warp_target_block_header = &mut self.warp_sync_target_block_header_rx_fused => {
if let Err(_) = self.pass_warp_sync_target_block_header(warp_target_block_header) {
error!(
target: LOG_TARGET,
"Failed to set warp sync target block header, terminating `SyncingEngine`.",
);
return
}
},
response_event = self.pending_responses.select_next_some() =>
self.process_response_event(response_event),
validation_result = self.block_announce_validator.select_next_some() =>
Expand Down Expand Up @@ -898,22 +865,6 @@ where
}
}

fn pass_warp_sync_target_block_header(
&mut self,
header: Result<B::Header, oneshot::Canceled>,
) -> Result<(), ()> {
match header {
Ok(header) => self.strategy.set_warp_sync_target_block_header(header),
Err(err) => {
error!(
target: LOG_TARGET,
"Failed to get target block for warp sync. Error: {err:?}",
);
Err(())
},
}
}

/// Called by peer when it is disconnecting.
///
/// Returns a result if the handshake of this peer was indeed accepted.
Expand Down
2 changes: 1 addition & 1 deletion substrate/client/network/sync/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@
//! Blockchain syncing implementation in Substrate.
pub use service::syncing_service::SyncingService;
pub use strategy::warp::{WarpSyncParams, WarpSyncPhase, WarpSyncProgress};
pub use strategy::warp::{WarpSyncConfig, WarpSyncPhase, WarpSyncProgress};
pub use types::{SyncEvent, SyncEventStream, SyncState, SyncStatus, SyncStatusProvider};

mod block_announce_validator;
Expand Down
Loading

0 comments on commit 6d819a6

Please sign in to comment.