Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Get rid of polling in WarpSync #1265

Merged
merged 11 commits into from
Sep 5, 2023
2 changes: 1 addition & 1 deletion polkadot/node/service/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -732,7 +732,7 @@ pub fn new_full<OverseerGenerator: OverseerGen>(
}: NewFullParams<OverseerGenerator>,
) -> Result<NewFull, Error> {
use polkadot_node_network_protocol::request_response::IncomingRequest;
use sc_network_common::sync::warp::WarpSyncParams;
use sc_network_sync::warp::WarpSyncParams;

let is_offchain_indexing_enabled = config.offchain_worker.indexing_enabled;
let role = config.role.clone();
Expand Down
3 changes: 1 addition & 2 deletions substrate/bin/node/cli/src/service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -32,8 +32,7 @@ use sc_client_api::{Backend, BlockBackend};
use sc_consensus_babe::{self, SlotProportion};
use sc_executor::NativeElseWasmExecutor;
use sc_network::{event::Event, NetworkEventStream, NetworkService};
use sc_network_common::sync::warp::WarpSyncParams;
use sc_network_sync::SyncingService;
use sc_network_sync::{warp::WarpSyncParams, SyncingService};
use sc_service::{config::Configuration, error::Error as ServiceError, RpcHandlers, TaskManager};
use sc_statement_store::Store as StatementStore;
use sc_telemetry::{Telemetry, TelemetryWorker};
Expand Down
13 changes: 1 addition & 12 deletions substrate/client/network/common/src/sync/warp.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,10 +15,9 @@
// along with Substrate. If not, see <http://www.gnu.org/licenses/>.

use codec::{Decode, Encode};
use futures::channel::oneshot;
pub use sp_consensus_grandpa::{AuthorityList, SetId};
use sp_runtime::traits::{Block as BlockT, NumberFor};
use std::{fmt, sync::Arc};
use std::fmt;

/// Scale-encoded warp sync proof response.
pub struct EncodedProof(pub Vec<u8>);
Expand All @@ -30,16 +29,6 @@ pub struct WarpProofRequest<B: BlockT> {
pub begin: B::Hash,
}

/// The different types of warp syncing.
pub enum WarpSyncParams<Block: BlockT> {
/// Standard warp sync for the chain.
WithProvider(Arc<dyn WarpSyncProvider<Block>>),
/// Skip downloading proofs and wait for a header of the state that should be downloaded.
///
/// It is expected that the header provider ensures that the header is trusted.
WaitForTarget(oneshot::Receiver<<Block as BlockT>::Header>),
}

/// Proof verification result.
pub enum VerificationResult<Block: BlockT> {
/// Proof is valid, but the target was not reached.
Expand Down
118 changes: 88 additions & 30 deletions substrate/client/network/sync/src/engine.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,11 +24,16 @@ use crate::{
BlockAnnounceValidationResult, BlockAnnounceValidator as BlockAnnounceValidatorStream,
},
service::{self, chain_sync::ToServiceCommand},
warp::WarpSyncParams,
ChainSync, ClientError, SyncingService,
};

use codec::{Decode, Encode};
use futures::{FutureExt, StreamExt};
use futures::{
channel::oneshot,
future::{BoxFuture, Fuse},
FutureExt, StreamExt,
};
use futures_timer::Delay;
use libp2p::PeerId;
use prometheus_endpoint::{
Expand All @@ -47,7 +52,6 @@ use sc_network_common::{
role::Roles,
sync::{
message::{BlockAnnounce, BlockAnnouncesHandshake, BlockState},
warp::WarpSyncParams,
BadPeer, ChainSync as ChainSyncT, ExtendedPeerInfo, SyncEvent,
},
};
Expand All @@ -67,6 +71,9 @@ use std::{
time::{Duration, Instant},
};

/// Log target for this file.
const LOG_TARGET: &'static str = "sync";

/// Interval at which we perform time based maintenance
const TICK_TIMEOUT: std::time::Duration = std::time::Duration::from_millis(1100);

Expand Down Expand Up @@ -251,6 +258,10 @@ pub struct SyncingEngine<B: BlockT, Client> {
/// The `PeerId`'s of all boot nodes.
boot_node_ids: HashSet<PeerId>,

/// A channel to get target block header if we skip over proofs downloading during warp sync.
warp_sync_target_block_header_rx:
Fuse<BoxFuture<'static, Result<B::Header, oneshot::Canceled>>>,

/// Protocol name used for block announcements
block_announce_protocol_name: ProtocolName,

Expand Down Expand Up @@ -299,7 +310,11 @@ where
let max_blocks_per_request = if net_config.network_config.max_blocks_per_request >
crate::MAX_BLOCKS_IN_RESPONSE as u32
{
log::info!(target: "sync", "clamping maximum blocks per request to {}", crate::MAX_BLOCKS_IN_RESPONSE);
log::info!(
target: LOG_TARGET,
"clamping maximum blocks per request to {}",
crate::MAX_BLOCKS_IN_RESPONSE,
);
crate::MAX_BLOCKS_IN_RESPONSE as u32
} else {
net_config.network_config.max_blocks_per_request
Expand Down Expand Up @@ -352,6 +367,19 @@ where
total.saturating_sub(net_config.network_config.default_peers_set_num_full) as usize
};

// Split warp sync params into warp sync config and a channel to retreive target block
// header.
let (warp_sync_config, warp_sync_target_block_header_rx) =
warp_sync_params.map_or((None, None), |params| {
let (config, target_block_rx) = params.split();
(Some(config), target_block_rx)
});

// Make sure polling of the target block channel is a no-op if there is no block to
// retrieve.
let warp_sync_target_block_header_rx = warp_sync_target_block_header_rx
.map_or(futures::future::pending().boxed().fuse(), |rx| rx.boxed().fuse());

let (chain_sync, block_announce_config) = ChainSync::new(
mode,
client.clone(),
Expand All @@ -360,7 +388,7 @@ where
roles,
max_parallel_downloads,
max_blocks_per_request,
warp_sync_params,
warp_sync_config,
metrics_registry,
network_service.clone(),
import_queue,
Expand Down Expand Up @@ -404,6 +432,7 @@ where
genesis_hash,
important_peers,
default_peers_set_no_slot_connected_peers: HashSet::new(),
warp_sync_target_block_header_rx,
boot_node_ids,
default_peers_set_no_slot_peers,
default_peers_set_num_full,
Expand All @@ -418,7 +447,7 @@ where
match Metrics::register(r, is_major_syncing.clone()) {
Ok(metrics) => Some(metrics),
Err(err) => {
log::error!(target: "sync", "Failed to register metrics {err:?}");
log::error!(target: LOG_TARGET, "Failed to register metrics {err:?}");
None
},
}
Expand Down Expand Up @@ -510,7 +539,10 @@ where
let peer = match self.peers.get_mut(&peer_id) {
Some(p) => p,
None => {
log::error!(target: "sync", "Received block announce from disconnected peer {}", peer_id);
log::error!(
target: LOG_TARGET,
"Received block announce from disconnected peer {peer_id}",
);
debug_assert!(false);
return
},
Expand All @@ -536,11 +568,11 @@ where
let header = match self.client.header(hash) {
Ok(Some(header)) => header,
Ok(None) => {
log::warn!(target: "sync", "Trying to announce unknown block: {}", hash);
log::warn!(target: LOG_TARGET, "Trying to announce unknown block: {hash}");
return
},
Err(e) => {
log::warn!(target: "sync", "Error reading block header {}: {}", hash, e);
log::warn!(target: LOG_TARGET, "Error reading block header {hash}: {e}");
return
},
};
Expand All @@ -551,7 +583,7 @@ where
}

let is_best = self.client.info().best_hash == hash;
log::debug!(target: "sync", "Reannouncing block {:?} is_best: {}", hash, is_best);
log::debug!(target: LOG_TARGET, "Reannouncing block {hash:?} is_best: {is_best}");

let data = data
.or_else(|| self.block_announce_data_cache.get(&hash).cloned())
Expand All @@ -560,7 +592,7 @@ where
for (peer_id, ref mut peer) in self.peers.iter_mut() {
let inserted = peer.known_blocks.insert(hash);
if inserted {
log::trace!(target: "sync", "Announcing block {:?} to {}", hash, peer_id);
log::trace!(target: LOG_TARGET, "Announcing block {hash:?} to {peer_id}");
let message = BlockAnnounce {
header: header.clone(),
state: if is_best { Some(BlockState::Best) } else { Some(BlockState::Normal) },
Expand All @@ -575,7 +607,7 @@ where

/// Inform sync about new best imported block.
pub fn new_best_block_imported(&mut self, hash: B::Hash, number: NumberFor<B>) {
log::debug!(target: "sync", "New best block imported {:?}/#{}", hash, number);
log::debug!(target: LOG_TARGET, "New best block imported {hash:?}/#{number}");

self.chain_sync.update_chain_info(&hash, number);
self.network_service.set_notification_handshake(
Expand Down Expand Up @@ -619,7 +651,10 @@ where
// consider it connected or are also all stalled. In order to unstall the node,
// disconnect all peers and allow `ProtocolController` to establish new connections.
if self.last_notification_io.elapsed() > INACTIVITY_EVICT_THRESHOLD {
log::debug!(target: "sync", "syncing has halted due to inactivity, evicting all peers");
log::debug!(
target: LOG_TARGET,
"syncing has halted due to inactivity, evicting all peers",
);

for peer in self.peers.keys() {
self.network_service.report_peer(*peer, rep::INACTIVE_SUBSTREAM);
Expand Down Expand Up @@ -658,7 +693,10 @@ where
ToServiceCommand::JustificationImported(peer_id, hash, number, success) => {
self.chain_sync.on_justification_import(hash, number, success);
if !success {
log::info!(target: "sync", "💔 Invalid justification provided by {} for #{}", peer_id, hash);
log::info!(
target: LOG_TARGET,
"💔 Invalid justification provided by {peer_id} for #{hash}",
);
self.network_service
.disconnect_peer(peer_id, self.block_announce_protocol_name.clone());
self.network_service.report_peer(
Expand Down Expand Up @@ -723,7 +761,7 @@ where
},
Err(()) => {
log::debug!(
target: "sync",
target: LOG_TARGET,
"Failed to register peer {remote:?}: {received_handshake:?}",
);
let _ = tx.send(false);
Expand All @@ -732,7 +770,7 @@ where
sc_network::SyncEvent::NotificationStreamClosed { remote } => {
if self.on_sync_peer_disconnected(remote).is_err() {
log::trace!(
target: "sync",
target: LOG_TARGET,
"Disconnected peer which had earlier been refused by on_sync_peer_connected {}",
remote
);
Expand All @@ -749,9 +787,8 @@ where
}
} else {
log::trace!(
target: "sync",
"Received sync for peer earlier refused by sync layer: {}",
remote
target: LOG_TARGET,
"Received sync for peer earlier refused by sync layer: {remote}",
);
}
}
Expand All @@ -764,6 +801,21 @@ where
}
}

// Retreive warp sync target block header just before polling `ChainSync`
// to make progress as soon as we receive it.
match self.warp_sync_target_block_header_rx.poll_unpin(cx) {
Poll::Ready(Ok(target)) => {
self.chain_sync.set_warp_sync_target_block(target);
},
Poll::Ready(Err(err)) => {
log::error!(
target: LOG_TARGET,
"Failed to get target block for warp sync. Error: {err:?}",
);
},
Poll::Pending => {},
}

// Drive `ChainSync`.
while let Poll::Ready(()) = self.chain_sync.poll(cx) {}

Expand All @@ -784,9 +836,9 @@ where
pub fn on_sync_peer_disconnected(&mut self, peer_id: PeerId) -> Result<(), ()> {
if let Some(info) = self.peers.remove(&peer_id) {
if self.important_peers.contains(&peer_id) {
log::warn!(target: "sync", "Reserved peer {} disconnected", peer_id);
log::warn!(target: LOG_TARGET, "Reserved peer {peer_id} disconnected");
} else {
log::debug!(target: "sync", "{} disconnected", peer_id);
log::debug!(target: LOG_TARGET, "{peer_id} disconnected");
}

if !self.default_peers_set_no_slot_connected_peers.remove(&peer_id) &&
Expand All @@ -798,7 +850,7 @@ where
},
None => {
log::error!(
target: "sync",
target: LOG_TARGET,
"trying to disconnect an inbound node which is not counted as inbound"
);
debug_assert!(false);
Expand Down Expand Up @@ -828,10 +880,13 @@ where
sink: NotificationsSink,
inbound: bool,
) -> Result<(), ()> {
log::trace!(target: "sync", "New peer {} {:?}", peer_id, status);
log::trace!(target: LOG_TARGET, "New peer {peer_id} {status:?}");

if self.peers.contains_key(&peer_id) {
log::error!(target: "sync", "Called on_sync_peer_connected with already connected peer {}", peer_id);
log::error!(
target: LOG_TARGET,
"Called on_sync_peer_connected with already connected peer {peer_id}",
);
debug_assert!(false);
return Err(())
}
Expand All @@ -841,23 +896,23 @@ where

if self.important_peers.contains(&peer_id) {
log::error!(
target: "sync",
target: LOG_TARGET,
"Reserved peer id `{}` is on a different chain (our genesis: {} theirs: {})",
peer_id,
self.genesis_hash,
status.genesis_hash,
);
} else if self.boot_node_ids.contains(&peer_id) {
log::error!(
target: "sync",
target: LOG_TARGET,
"Bootnode with peer id `{}` is on a different chain (our genesis: {} theirs: {})",
peer_id,
self.genesis_hash,
status.genesis_hash,
);
} else {
log::debug!(
target: "sync",
target: LOG_TARGET,
"Peer is on different chain (our genesis: {} theirs: {})",
self.genesis_hash, status.genesis_hash
);
Expand All @@ -874,7 +929,10 @@ where
status.roles.is_full() &&
inbound && self.num_in_peers == self.max_in_peers
{
log::debug!(target: "sync", "All inbound slots have been consumed, rejecting {peer_id}");
log::debug!(
target: LOG_TARGET,
"All inbound slots have been consumed, rejecting {peer_id}",
);
return Err(())
}

Expand All @@ -884,15 +942,15 @@ where
self.default_peers_set_no_slot_connected_peers.len() +
this_peer_reserved_slot
{
log::debug!(target: "sync", "Too many full nodes, rejecting {}", peer_id);
log::debug!(target: LOG_TARGET, "Too many full nodes, rejecting {peer_id}");
return Err(())
}

if status.roles.is_light() &&
(self.peers.len() - self.chain_sync.num_peers()) >= self.default_peers_set_num_light
{
// Make sure that not all slots are occupied by light clients.
log::debug!(target: "sync", "Too many light nodes, rejecting {}", peer_id);
log::debug!(target: LOG_TARGET, "Too many light nodes, rejecting {peer_id}");
return Err(())
}

Expand Down Expand Up @@ -921,7 +979,7 @@ where
None
};

log::debug!(target: "sync", "Connected {}", peer_id);
log::debug!(target: LOG_TARGET, "Connected {peer_id}");

self.peers.insert(peer_id, peer);

Expand Down
Loading