From f72590668a166eb8c7c3c7534ecabf481781891d Mon Sep 17 00:00:00 2001 From: Dmitry Markin Date: Tue, 29 Aug 2023 17:59:44 +0300 Subject: [PATCH 1/7] Get rid of polling in `WarpSync` --- .../client/network/common/src/sync/warp.rs | 23 ++++++++ substrate/client/network/sync/src/engine.rs | 35 ++++++++++- substrate/client/network/sync/src/lib.rs | 37 ++++++++---- substrate/client/network/sync/src/warp.rs | 59 ++++++++----------- 4 files changed, 106 insertions(+), 48 deletions(-) diff --git a/substrate/client/network/common/src/sync/warp.rs b/substrate/client/network/common/src/sync/warp.rs index 37a6e62c53b4..043d77676412 100644 --- a/substrate/client/network/common/src/sync/warp.rs +++ b/substrate/client/network/common/src/sync/warp.rs @@ -40,6 +40,29 @@ pub enum WarpSyncParams { WaitForTarget(oneshot::Receiver<::Header>), } +/// Warp sync configuration. +pub enum WarpSyncConfig { + /// Standard warp sync for the chain. + WithProvider(Arc>), + /// Skip downloading proofs and wait for a header of the state that should be downloaded. + /// + /// It is expected that the header provider ensures that the header is trusted. + WaitForTarget, +} + +impl WarpSyncParams { + /// Split `WarpSyncParams` into `WarpSyncConfig` and warp sync target block header receiver. + pub fn split( + self, + ) -> (WarpSyncConfig, Option::Header>>) { + match self { + WarpSyncParams::WithProvider(provider) => + (WarpSyncConfig::WithProvider(provider), None), + WarpSyncParams::WaitForTarget(rx) => (WarpSyncConfig::WaitForTarget, Some(rx)), + } + } +} + /// Proof verification result. pub enum VerificationResult { /// Proof is valid, but the target was not reached. diff --git a/substrate/client/network/sync/src/engine.rs b/substrate/client/network/sync/src/engine.rs index d5c4957ab3d7..75e5c7a0ccb6 100644 --- a/substrate/client/network/sync/src/engine.rs +++ b/substrate/client/network/sync/src/engine.rs @@ -25,7 +25,7 @@ use crate::{ }; use codec::{Decode, Encode}; -use futures::{FutureExt, StreamExt}; +use futures::{channel::oneshot, FutureExt, StreamExt}; use futures_timer::Delay; use libp2p::PeerId; use prometheus_endpoint::{ @@ -245,6 +245,9 @@ pub struct SyncingEngine { /// The `PeerId`'s of all boot nodes. boot_node_ids: HashSet, + /// A channel to get target block header if we skip over proofs downloading during warp sync. + warp_sync_target_block_header_rx: Option::Header>>, + /// Protocol name used for block announcements block_announce_protocol_name: ProtocolName, @@ -346,6 +349,14 @@ where total.saturating_sub(net_config.network_config.default_peers_set_num_full) as usize }; + // Split warp sync params into warp sync config and a channel to retreive target block + // header. + let (warp_sync_config, warp_sync_target_block_header_rx) = + warp_sync_params.map_or((None, None), |p| { + let (config, target_block_rx) = p.split(); + (Some(config), target_block_rx) + }); + let (chain_sync, block_announce_config) = ChainSync::new( mode, client.clone(), @@ -355,7 +366,7 @@ where block_announce_validator, max_parallel_downloads, max_blocks_per_request, - warp_sync_params, + warp_sync_config, metrics_registry, network_service.clone(), import_queue, @@ -396,6 +407,7 @@ where genesis_hash, important_peers, default_peers_set_no_slot_connected_peers: HashSet::new(), + warp_sync_target_block_header_rx, boot_node_ids, default_peers_set_no_slot_peers, default_peers_set_num_full, @@ -770,6 +782,25 @@ where } } + // Retreive warp sync target block header just before polling `ChainSync` + // to make progress as soon as we receive it. + if let Some(ref mut target_block_header_rx) = self.warp_sync_target_block_header_rx { + match target_block_header_rx.poll_unpin(cx) { + Poll::Ready(Ok(target)) => { + self.chain_sync.set_warp_sync_target_block(target); + self.warp_sync_target_block_header_rx = None; + }, + Poll::Ready(Err(err)) => { + log::error!( + target: "sync", + "Failed to get target block for warp sync. Error: {err:?}", + ); + self.warp_sync_target_block_header_rx = None; + }, + Poll::Pending => {}, + } + } + // poll `ChainSync` last because of a block announcement was received through the // event stream between `SyncingEngine` and `Protocol` and the validation finished // right after it as queued, the resulting block request (if any) can be sent right away. diff --git a/substrate/client/network/sync/src/lib.rs b/substrate/client/network/sync/src/lib.rs index 175c1c43f46f..9ffc3e968e81 100644 --- a/substrate/client/network/sync/src/lib.rs +++ b/substrate/client/network/sync/src/lib.rs @@ -63,7 +63,7 @@ use sc_network_common::{ BlockAnnounce, BlockAnnouncesHandshake, BlockAttributes, BlockData, BlockRequest, BlockResponse, Direction, FromBlock, }, - warp::{EncodedProof, WarpProofRequest, WarpSyncParams, WarpSyncPhase, WarpSyncProgress}, + warp::{EncodedProof, WarpProofRequest, WarpSyncConfig, WarpSyncPhase, WarpSyncProgress}, BadPeer, ChainSync as ChainSyncT, ImportResult, Metrics, OnBlockData, OnBlockJustification, OnStateData, OpaqueBlockRequest, OpaqueBlockResponse, OpaqueStateRequest, OpaqueStateResponse, PeerInfo, PeerRequest, PollBlockAnnounceValidation, SyncMode, @@ -324,10 +324,12 @@ pub struct ChainSync { state_sync: Option>, /// Warp sync in progress, if any. warp_sync: Option>, - /// Warp sync params. + /// Warp sync configuration. /// /// Will be `None` after `self.warp_sync` is `Some(_)`. - warp_sync_params: Option>, + warp_sync_config: Option>, + /// A temporary storage for warp sync target block until warp sync is initialized. + warp_sync_target_block_header: Option, /// Enable importing existing blocks. This is used used after the state download to /// catch up to the latest state while re-importing blocks. import_existing: bool, @@ -646,8 +648,13 @@ where if self.peers.len() >= MIN_PEERS_TO_START_WARP_SYNC && self.warp_sync.is_none() { log::debug!(target: "sync", "Starting warp state sync."); - if let Some(params) = self.warp_sync_params.take() { - self.warp_sync = Some(WarpSync::new(self.client.clone(), params)); + + if let Some(config) = self.warp_sync_config.take() { + let mut warp_sync = WarpSync::new(self.client.clone(), config); + if let Some(header) = self.warp_sync_target_block_header.take() { + warp_sync.set_target_block(header); + } + self.warp_sync = Some(warp_sync); } } } @@ -1323,12 +1330,6 @@ where &mut self, cx: &mut std::task::Context, ) -> Poll> { - // Should be called before `process_outbound_requests` to ensure - // that a potential target block is directly leading to requests. - if let Some(warp_sync) = &mut self.warp_sync { - let _ = warp_sync.poll(cx); - } - self.process_outbound_requests(); while let Poll::Ready(result) = self.poll_pending_responses(cx) { @@ -1398,7 +1399,7 @@ where block_announce_validator: Box + Send>, max_parallel_downloads: u32, max_blocks_per_request: u32, - warp_sync_params: Option>, + warp_sync_config: Option>, metrics_registry: Option<&Registry>, network_service: service::network::NetworkServiceHandle, import_queue: Box>, @@ -1443,7 +1444,8 @@ where network_service, block_request_protocol_name, state_request_protocol_name, - warp_sync_params, + warp_sync_config, + warp_sync_target_block_header: None, warp_sync_protocol_name, block_announce_protocol_name: block_announce_config .notifications_protocol @@ -1896,6 +1898,15 @@ where .collect() } + /// Set warp sync target block externally in case we skip warp proof downloading. + pub fn set_warp_sync_target_block(&mut self, header: B::Header) { + if let Some(ref mut warp_sync) = self.warp_sync { + warp_sync.set_target_block(header); + } else { + self.warp_sync_target_block_header = Some(header); + } + } + /// Generate block request for downloading of the target block body during warp sync. fn warp_target_block_request(&mut self) -> Option<(PeerId, BlockRequest)> { let sync = &self.warp_sync.as_ref()?; diff --git a/substrate/client/network/sync/src/warp.rs b/substrate/client/network/sync/src/warp.rs index 912ad78dfdd0..176981321f61 100644 --- a/substrate/client/network/sync/src/warp.rs +++ b/substrate/client/network/sync/src/warp.rs @@ -19,36 +19,41 @@ //! Warp sync support. use crate::{ - oneshot, schema::v1::{StateRequest, StateResponse}, state::{ImportResult, StateSync}, }; -use futures::FutureExt; use log::error; use sc_client_api::ProofProvider; use sc_network_common::sync::{ message::{BlockAttributes, BlockData, BlockRequest, Direction, FromBlock}, warp::{ - EncodedProof, VerificationResult, WarpProofRequest, WarpSyncParams, WarpSyncPhase, + EncodedProof, VerificationResult, WarpProofRequest, WarpSyncConfig, WarpSyncPhase, WarpSyncProgress, WarpSyncProvider, }, }; use sp_blockchain::HeaderBackend; use sp_consensus_grandpa::{AuthorityList, SetId}; use sp_runtime::traits::{Block as BlockT, Header, NumberFor, Zero}; -use std::{sync::Arc, task::Poll}; +use std::sync::Arc; +/// Log target for this file. +const LOG_TARGET: &'static str = "sync"; + +/// Warp sync phase. enum Phase { + /// Downloading warp proofs. WarpProof { set_id: SetId, authorities: AuthorityList, last_hash: B::Hash, warp_sync_provider: Arc>, }, - PendingTargetBlock { - target_block: Option>, - }, + /// Waiting for target block to be set externally if we skip warp proofs downloading, + /// and start straight from the target block (used by parachains warp sync). + PendingTargetBlock, + /// Downloading target block. TargetBlock(B::Header), + /// Downloading state. State(StateSync), } @@ -83,10 +88,10 @@ where /// Create a new instance. When passing a warp sync provider we will be checking for proof and /// authorities. Alternatively we can pass a target block when we want to skip downloading /// proofs, in this case we will continue polling until the target block is known. - pub fn new(client: Arc, warp_sync_params: WarpSyncParams) -> Self { + pub fn new(client: Arc, warp_sync_config: WarpSyncConfig) -> Self { let last_hash = client.hash(Zero::zero()).unwrap().expect("Genesis header always exists"); - match warp_sync_params { - WarpSyncParams::WithProvider(warp_sync_provider) => { + match warp_sync_config { + WarpSyncConfig::WithProvider(warp_sync_provider) => { let phase = Phase::WarpProof { set_id: 0, authorities: warp_sync_provider.current_authorities(), @@ -95,35 +100,23 @@ where }; Self { client, phase, total_proof_bytes: 0 } }, - WarpSyncParams::WaitForTarget(block) => Self { - client, - phase: Phase::PendingTargetBlock { target_block: Some(block) }, - total_proof_bytes: 0, - }, + WarpSyncConfig::WaitForTarget => + Self { client, phase: Phase::PendingTargetBlock, total_proof_bytes: 0 }, } } - /// Poll to make progress. - /// - /// This only makes progress when `phase = Phase::PendingTargetBlock` and the pending block was - /// sent. - pub fn poll(&mut self, cx: &mut std::task::Context) { - let new_phase = if let Phase::PendingTargetBlock { target_block: Some(target_block) } = - &mut self.phase - { - match target_block.poll_unpin(cx) { - Poll::Ready(Ok(target)) => Phase::TargetBlock(target), - Poll::Ready(Err(e)) => { - error!(target: "sync", "Failed to get target block. Error: {:?}",e); - Phase::PendingTargetBlock { target_block: None } - }, - _ => return, - } - } else { + /// Set target block externally in case we skip warp proof downloading. + pub fn set_target_block(&mut self, header: B::Header) { + let Phase::PendingTargetBlock = self.phase else { + error!( + target: LOG_TARGET, + "Attempt to set warp sync target block in invalide phase.", + ); + debug_assert!(false); return }; - self.phase = new_phase; + self.phase = Phase::TargetBlock(header); } /// Validate and import a state response. From 5935bd5054fd54a872f8ca519c1f80d1fe391d72 Mon Sep 17 00:00:00 2001 From: Dmitry Markin Date: Wed, 30 Aug 2023 17:38:44 +0300 Subject: [PATCH 2/7] Move `WarpSyncParams` & `WarpSyncConfig` to `sc-network-sync` --- polkadot/node/service/src/lib.rs | 2 +- substrate/bin/node/cli/src/service.rs | 3 +- .../client/network/common/src/sync/warp.rs | 33 ---------------- substrate/client/network/sync/src/engine.rs | 2 +- substrate/client/network/sync/src/lib.rs | 4 +- substrate/client/network/sync/src/warp.rs | 38 ++++++++++++++++++- substrate/client/network/test/src/lib.rs | 5 +-- substrate/client/service/src/builder.rs | 5 ++- substrate/client/service/src/lib.rs | 2 +- 9 files changed, 47 insertions(+), 47 deletions(-) diff --git a/polkadot/node/service/src/lib.rs b/polkadot/node/service/src/lib.rs index 95c887947c98..7f4eadaba7f8 100644 --- a/polkadot/node/service/src/lib.rs +++ b/polkadot/node/service/src/lib.rs @@ -732,7 +732,7 @@ pub fn new_full( }: NewFullParams, ) -> Result { use polkadot_node_network_protocol::request_response::IncomingRequest; - use sc_network_common::sync::warp::WarpSyncParams; + use sc_network_sync::warp::WarpSyncParams; let is_offchain_indexing_enabled = config.offchain_worker.indexing_enabled; let role = config.role.clone(); diff --git a/substrate/bin/node/cli/src/service.rs b/substrate/bin/node/cli/src/service.rs index ecca5c60db51..e49c60fe2fb7 100644 --- a/substrate/bin/node/cli/src/service.rs +++ b/substrate/bin/node/cli/src/service.rs @@ -32,8 +32,7 @@ use sc_client_api::{Backend, BlockBackend}; use sc_consensus_babe::{self, SlotProportion}; use sc_executor::NativeElseWasmExecutor; use sc_network::{event::Event, NetworkEventStream, NetworkService}; -use sc_network_common::sync::warp::WarpSyncParams; -use sc_network_sync::SyncingService; +use sc_network_sync::{warp::WarpSyncParams, SyncingService}; use sc_service::{config::Configuration, error::Error as ServiceError, RpcHandlers, TaskManager}; use sc_statement_store::Store as StatementStore; use sc_telemetry::{Telemetry, TelemetryWorker}; diff --git a/substrate/client/network/common/src/sync/warp.rs b/substrate/client/network/common/src/sync/warp.rs index 043d77676412..be69686f1c1d 100644 --- a/substrate/client/network/common/src/sync/warp.rs +++ b/substrate/client/network/common/src/sync/warp.rs @@ -30,39 +30,6 @@ pub struct WarpProofRequest { pub begin: B::Hash, } -/// The different types of warp syncing. -pub enum WarpSyncParams { - /// Standard warp sync for the chain. - WithProvider(Arc>), - /// Skip downloading proofs and wait for a header of the state that should be downloaded. - /// - /// It is expected that the header provider ensures that the header is trusted. - WaitForTarget(oneshot::Receiver<::Header>), -} - -/// Warp sync configuration. -pub enum WarpSyncConfig { - /// Standard warp sync for the chain. - WithProvider(Arc>), - /// Skip downloading proofs and wait for a header of the state that should be downloaded. - /// - /// It is expected that the header provider ensures that the header is trusted. - WaitForTarget, -} - -impl WarpSyncParams { - /// Split `WarpSyncParams` into `WarpSyncConfig` and warp sync target block header receiver. - pub fn split( - self, - ) -> (WarpSyncConfig, Option::Header>>) { - match self { - WarpSyncParams::WithProvider(provider) => - (WarpSyncConfig::WithProvider(provider), None), - WarpSyncParams::WaitForTarget(rx) => (WarpSyncConfig::WaitForTarget, Some(rx)), - } - } -} - /// Proof verification result. pub enum VerificationResult { /// Proof is valid, but the target was not reached. diff --git a/substrate/client/network/sync/src/engine.rs b/substrate/client/network/sync/src/engine.rs index 75e5c7a0ccb6..8d258fe379fa 100644 --- a/substrate/client/network/sync/src/engine.rs +++ b/substrate/client/network/sync/src/engine.rs @@ -21,6 +21,7 @@ use crate::{ service::{self, chain_sync::ToServiceCommand}, + warp::WarpSyncParams, ChainSync, ClientError, SyncingService, }; @@ -44,7 +45,6 @@ use sc_network_common::{ role::Roles, sync::{ message::{BlockAnnounce, BlockAnnouncesHandshake, BlockState}, - warp::WarpSyncParams, BadPeer, ChainSync as ChainSyncT, ExtendedPeerInfo, PollBlockAnnounceValidation, SyncEvent, }, }; diff --git a/substrate/client/network/sync/src/lib.rs b/substrate/client/network/sync/src/lib.rs index 9ffc3e968e81..9ed749675f89 100644 --- a/substrate/client/network/sync/src/lib.rs +++ b/substrate/client/network/sync/src/lib.rs @@ -32,7 +32,7 @@ use crate::{ blocks::BlockCollection, schema::v1::{StateRequest, StateResponse}, state::StateSync, - warp::{WarpProofImportResult, WarpSync}, + warp::{WarpProofImportResult, WarpSync, WarpSyncConfig}, }; use codec::{Decode, DecodeAll, Encode}; @@ -63,7 +63,7 @@ use sc_network_common::{ BlockAnnounce, BlockAnnouncesHandshake, BlockAttributes, BlockData, BlockRequest, BlockResponse, Direction, FromBlock, }, - warp::{EncodedProof, WarpProofRequest, WarpSyncConfig, WarpSyncPhase, WarpSyncProgress}, + warp::{EncodedProof, WarpProofRequest, WarpSyncPhase, WarpSyncProgress}, BadPeer, ChainSync as ChainSyncT, ImportResult, Metrics, OnBlockData, OnBlockJustification, OnStateData, OpaqueBlockRequest, OpaqueBlockResponse, OpaqueStateRequest, OpaqueStateResponse, PeerInfo, PeerRequest, PollBlockAnnounceValidation, SyncMode, diff --git a/substrate/client/network/sync/src/warp.rs b/substrate/client/network/sync/src/warp.rs index 176981321f61..cfbc51123a44 100644 --- a/substrate/client/network/sync/src/warp.rs +++ b/substrate/client/network/sync/src/warp.rs @@ -22,13 +22,14 @@ use crate::{ schema::v1::{StateRequest, StateResponse}, state::{ImportResult, StateSync}, }; +use futures::channel::oneshot; use log::error; use sc_client_api::ProofProvider; use sc_network_common::sync::{ message::{BlockAttributes, BlockData, BlockRequest, Direction, FromBlock}, warp::{ - EncodedProof, VerificationResult, WarpProofRequest, WarpSyncConfig, WarpSyncPhase, - WarpSyncProgress, WarpSyncProvider, + EncodedProof, VerificationResult, WarpProofRequest, WarpSyncPhase, WarpSyncProgress, + WarpSyncProvider, }, }; use sp_blockchain::HeaderBackend; @@ -39,6 +40,39 @@ use std::sync::Arc; /// Log target for this file. const LOG_TARGET: &'static str = "sync"; +/// The different types of warp syncing, passed to `build_network`. +pub enum WarpSyncParams { + /// Standard warp sync for the chain. + WithProvider(Arc>), + /// Skip downloading proofs and wait for a header of the state that should be downloaded. + /// + /// It is expected that the header provider ensures that the header is trusted. + WaitForTarget(oneshot::Receiver<::Header>), +} + +/// Warp sync configuration as accepted by [`WarpSync`]. +pub enum WarpSyncConfig { + /// Standard warp sync for the chain. + WithProvider(Arc>), + /// Skip downloading proofs and wait for a header of the state that should be downloaded. + /// + /// It is expected that the header provider ensures that the header is trusted. + WaitForTarget, +} + +impl WarpSyncParams { + /// Split `WarpSyncParams` into `WarpSyncConfig` and warp sync target block header receiver. + pub fn split( + self, + ) -> (WarpSyncConfig, Option::Header>>) { + match self { + WarpSyncParams::WithProvider(provider) => + (WarpSyncConfig::WithProvider(provider), None), + WarpSyncParams::WaitForTarget(rx) => (WarpSyncConfig::WaitForTarget, Some(rx)), + } + } +} + /// Warp sync phase. enum Phase { /// Downloading warp proofs. diff --git a/substrate/client/network/test/src/lib.rs b/substrate/client/network/test/src/lib.rs index 2a20da5a556b..d350b0e54ae1 100644 --- a/substrate/client/network/test/src/lib.rs +++ b/substrate/client/network/test/src/lib.rs @@ -62,15 +62,14 @@ use sc_network::{ }; use sc_network_common::{ role::Roles, - sync::warp::{ - AuthorityList, EncodedProof, SetId, VerificationResult, WarpSyncParams, WarpSyncProvider, - }, + sync::warp::{AuthorityList, EncodedProof, SetId, VerificationResult, WarpSyncProvider}, }; use sc_network_light::light_client_requests::handler::LightClientRequestHandler; use sc_network_sync::{ block_request_handler::BlockRequestHandler, service::{chain_sync::SyncingService, network::NetworkServiceProvider}, state_request_handler::StateRequestHandler, + warp::WarpSyncParams, warp_request_handler, }; use sc_service::client::Client; diff --git a/substrate/client/service/src/builder.rs b/substrate/client/service/src/builder.rs index fe18d1d002d5..917b3be8dc7c 100644 --- a/substrate/client/service/src/builder.rs +++ b/substrate/client/service/src/builder.rs @@ -47,12 +47,13 @@ use sc_network::{ NetworkService, NetworkStateInfo, NetworkStatusProvider, }; use sc_network_bitswap::BitswapRequestHandler; -use sc_network_common::{role::Roles, sync::warp::WarpSyncParams}; +use sc_network_common::role::Roles; use sc_network_light::light_client_requests::handler::LightClientRequestHandler; use sc_network_sync::{ block_request_handler::BlockRequestHandler, engine::SyncingEngine, service::network::NetworkServiceProvider, state_request_handler::StateRequestHandler, - warp_request_handler::RequestHandler as WarpSyncRequestHandler, SyncingService, + warp::WarpSyncParams, warp_request_handler::RequestHandler as WarpSyncRequestHandler, + SyncingService, }; use sc_rpc::{ author::AuthorApiServer, diff --git a/substrate/client/service/src/lib.rs b/substrate/client/service/src/lib.rs index 0961967f9ca2..cd720e1c1e09 100644 --- a/substrate/client/service/src/lib.rs +++ b/substrate/client/service/src/lib.rs @@ -79,7 +79,7 @@ pub use sc_chain_spec::{ pub use sc_consensus::ImportQueue; pub use sc_executor::NativeExecutionDispatch; -pub use sc_network_common::sync::warp::WarpSyncParams; +pub use sc_network_sync::warp::WarpSyncParams; #[doc(hidden)] pub use sc_network_transactions::config::{TransactionImport, TransactionImportFuture}; pub use sc_rpc::{ From f665db207ff9d780b2b75e172e24f18d5a12ebfd Mon Sep 17 00:00:00 2001 From: Dmitry Markin Date: Wed, 30 Aug 2023 18:06:57 +0300 Subject: [PATCH 3/7] Use `LOG_TARGET` constant instead of `"sync"` --- substrate/client/network/sync/src/engine.rs | 77 +++-- substrate/client/network/sync/src/lib.rs | 338 +++++++++++--------- 2 files changed, 229 insertions(+), 186 deletions(-) diff --git a/substrate/client/network/sync/src/engine.rs b/substrate/client/network/sync/src/engine.rs index 8d258fe379fa..9a71480cea8e 100644 --- a/substrate/client/network/sync/src/engine.rs +++ b/substrate/client/network/sync/src/engine.rs @@ -64,6 +64,9 @@ use std::{ time::{Duration, Instant}, }; +/// Log target for this file. +const LOG_TARGET: &'static str = "sync"; + /// Interval at which we perform time based maintenance const TICK_TIMEOUT: std::time::Duration = std::time::Duration::from_millis(1100); @@ -296,7 +299,11 @@ where let max_blocks_per_request = if net_config.network_config.max_blocks_per_request > crate::MAX_BLOCKS_IN_RESPONSE as u32 { - log::info!(target: "sync", "clamping maximum blocks per request to {}", crate::MAX_BLOCKS_IN_RESPONSE); + log::info!( + target: LOG_TARGET, + "clamping maximum blocks per request to {}", + crate::MAX_BLOCKS_IN_RESPONSE, + ); crate::MAX_BLOCKS_IN_RESPONSE as u32 } else { net_config.network_config.max_blocks_per_request @@ -422,7 +429,7 @@ where match Metrics::register(r, is_major_syncing.clone()) { Ok(metrics) => Some(metrics), Err(err) => { - log::error!(target: "sync", "Failed to register metrics {err:?}"); + log::error!(target: LOG_TARGET, "Failed to register metrics {err:?}"); None }, } @@ -524,7 +531,11 @@ where let peer = match self.peers.get_mut(&who) { Some(p) => p, None => { - log::error!(target: "sync", "Received block announce from disconnected peer {}", who); + log::error!( + target: LOG_TARGET, + "Received block announce from disconnected peer {}", + who, + ); debug_assert!(false); return }, @@ -549,11 +560,11 @@ where let header = match self.client.header(hash) { Ok(Some(header)) => header, Ok(None) => { - log::warn!(target: "sync", "Trying to announce unknown block: {}", hash); + log::warn!(target: LOG_TARGET, "Trying to announce unknown block: {}", hash); return }, Err(e) => { - log::warn!(target: "sync", "Error reading block header {}: {}", hash, e); + log::warn!(target: LOG_TARGET, "Error reading block header {}: {}", hash, e); return }, }; @@ -564,7 +575,7 @@ where } let is_best = self.client.info().best_hash == hash; - log::debug!(target: "sync", "Reannouncing block {:?} is_best: {}", hash, is_best); + log::debug!(target: LOG_TARGET, "Reannouncing block {:?} is_best: {}", hash, is_best); let data = data .or_else(|| self.block_announce_data_cache.get(&hash).cloned()) @@ -573,7 +584,7 @@ where for (who, ref mut peer) in self.peers.iter_mut() { let inserted = peer.known_blocks.insert(hash); if inserted { - log::trace!(target: "sync", "Announcing block {:?} to {}", hash, who); + log::trace!(target: LOG_TARGET, "Announcing block {:?} to {}", hash, who); let message = BlockAnnounce { header: header.clone(), state: if is_best { Some(BlockState::Best) } else { Some(BlockState::Normal) }, @@ -588,7 +599,7 @@ where /// Inform sync about new best imported block. pub fn new_best_block_imported(&mut self, hash: B::Hash, number: NumberFor) { - log::debug!(target: "sync", "New best block imported {:?}/#{}", hash, number); + log::debug!(target: LOG_TARGET, "New best block imported {:?}/#{}", hash, number); self.chain_sync.update_chain_info(&hash, number); self.network_service.set_notification_handshake( @@ -632,7 +643,10 @@ where // consider it connected or are also all stalled. In order to unstall the node, // disconnect all peers and allow `ProtocolController` to establish new connections. if self.last_notification_io.elapsed() > INACTIVITY_EVICT_THRESHOLD { - log::debug!(target: "sync", "syncing has halted due to inactivity, evicting all peers"); + log::debug!( + target: LOG_TARGET, + "syncing has halted due to inactivity, evicting all peers", + ); for peer in self.peers.keys() { self.network_service.report_peer(*peer, rep::INACTIVE_SUBSTREAM); @@ -671,7 +685,11 @@ where ToServiceCommand::JustificationImported(peer, hash, number, success) => { self.chain_sync.on_justification_import(hash, number, success); if !success { - log::info!(target: "sync", "💔 Invalid justification provided by {} for #{}", peer, hash); + log::info!( + target: LOG_TARGET, + "💔 Invalid justification provided by {} for #{}", + peer, hash, + ); self.network_service .disconnect_peer(peer, self.block_announce_protocol_name.clone()); self.network_service.report_peer( @@ -733,7 +751,7 @@ where }, Err(()) => { log::debug!( - target: "sync", + target: LOG_TARGET, "Failed to register peer {remote:?}: {received_handshake:?}", ); let _ = tx.send(false); @@ -742,7 +760,7 @@ where sc_network::SyncEvent::NotificationStreamClosed { remote } => { if self.on_sync_peer_disconnected(remote).is_err() { log::trace!( - target: "sync", + target: LOG_TARGET, "Disconnected peer which had earlier been refused by on_sync_peer_connected {}", remote ); @@ -767,7 +785,7 @@ where } } else { log::trace!( - target: "sync", + target: LOG_TARGET, "Received sync for peer earlier refused by sync layer: {}", remote ); @@ -792,7 +810,7 @@ where }, Poll::Ready(Err(err)) => { log::error!( - target: "sync", + target: LOG_TARGET, "Failed to get target block for warp sync. Error: {err:?}", ); self.warp_sync_target_block_header_rx = None; @@ -817,9 +835,9 @@ where pub fn on_sync_peer_disconnected(&mut self, peer: PeerId) -> Result<(), ()> { if let Some(info) = self.peers.remove(&peer) { if self.important_peers.contains(&peer) { - log::warn!(target: "sync", "Reserved peer {} disconnected", peer); + log::warn!(target: LOG_TARGET, "Reserved peer {} disconnected", peer); } else { - log::debug!(target: "sync", "{} disconnected", peer); + log::debug!(target: LOG_TARGET, "{} disconnected", peer); } if !self.default_peers_set_no_slot_connected_peers.remove(&peer) && @@ -831,7 +849,7 @@ where }, None => { log::error!( - target: "sync", + target: LOG_TARGET, "trying to disconnect an inbound node which is not counted as inbound" ); debug_assert!(false); @@ -860,10 +878,14 @@ where sink: NotificationsSink, inbound: bool, ) -> Result<(), ()> { - log::trace!(target: "sync", "New peer {} {:?}", who, status); + log::trace!(target: LOG_TARGET, "New peer {} {:?}", who, status); if self.peers.contains_key(&who) { - log::error!(target: "sync", "Called on_sync_peer_connected with already connected peer {}", who); + log::error!( + target: LOG_TARGET, + "Called on_sync_peer_connected with already connected peer {}", + who, + ); debug_assert!(false); return Err(()) } @@ -873,7 +895,7 @@ where if self.important_peers.contains(&who) { log::error!( - target: "sync", + target: LOG_TARGET, "Reserved peer id `{}` is on a different chain (our genesis: {} theirs: {})", who, self.genesis_hash, @@ -881,7 +903,7 @@ where ); } else if self.boot_node_ids.contains(&who) { log::error!( - target: "sync", + target: LOG_TARGET, "Bootnode with peer id `{}` is on a different chain (our genesis: {} theirs: {})", who, self.genesis_hash, @@ -889,7 +911,7 @@ where ); } else { log::debug!( - target: "sync", + target: LOG_TARGET, "Peer is on different chain (our genesis: {} theirs: {})", self.genesis_hash, status.genesis_hash ); @@ -906,7 +928,10 @@ where status.roles.is_full() && inbound && self.num_in_peers == self.max_in_peers { - log::debug!(target: "sync", "All inbound slots have been consumed, rejecting {who}"); + log::debug!( + target: LOG_TARGET, + "All inbound slots have been consumed, rejecting {who}", + ); return Err(()) } @@ -916,7 +941,7 @@ where self.default_peers_set_no_slot_connected_peers.len() + this_peer_reserved_slot { - log::debug!(target: "sync", "Too many full nodes, rejecting {}", who); + log::debug!(target: LOG_TARGET, "Too many full nodes, rejecting {}", who); return Err(()) } @@ -924,7 +949,7 @@ where (self.peers.len() - self.chain_sync.num_peers()) >= self.default_peers_set_num_light { // Make sure that not all slots are occupied by light clients. - log::debug!(target: "sync", "Too many light nodes, rejecting {}", who); + log::debug!(target: LOG_TARGET, "Too many light nodes, rejecting {}", who); return Err(()) } @@ -953,7 +978,7 @@ where None }; - log::debug!(target: "sync", "Connected {}", who); + log::debug!(target: LOG_TARGET, "Connected {}", who); self.peers.insert(who, peer); diff --git a/substrate/client/network/sync/src/lib.rs b/substrate/client/network/sync/src/lib.rs index 9ed749675f89..7686956abd8b 100644 --- a/substrate/client/network/sync/src/lib.rs +++ b/substrate/client/network/sync/src/lib.rs @@ -107,6 +107,9 @@ pub mod state_request_handler; pub mod warp; pub mod warp_request_handler; +/// Log target for this file. +const LOG_TARGET: &'static str = "sync"; + /// Maximum blocks to store in the import queue. const MAX_IMPORTING_BLOCKS: usize = 2048; @@ -375,7 +378,7 @@ impl PeerSync { fn update_common_number(&mut self, new_common: NumberFor) { if self.common_number < new_common { trace!( - target: "sync", + target: LOG_TARGET, "Updating peer {} common number from={} => to={}.", self.peer_id, self.common_number, @@ -566,7 +569,7 @@ where // There is nothing sync can get from the node that has no blockchain data. match self.block_status(&best_hash) { Err(e) => { - debug!(target:"sync", "Error reading blockchain: {}", e); + debug!(target:LOG_TARGET, "Error reading blockchain: {e}"); Err(BadPeer(who, rep::BLOCKCHAIN_READ_ERROR)) }, Ok(BlockStatus::KnownBad) => { @@ -584,7 +587,7 @@ where // an ancestor search, which is what we do in the next match case below. if self.queue_blocks.len() > MAJOR_SYNC_BLOCKS.into() { debug!( - target:"sync", + target:LOG_TARGET, "New peer with unknown best hash {} ({}), assuming common block.", self.best_queued_hash, self.best_queued_number @@ -605,10 +608,8 @@ where // If we are at genesis, just start downloading. let (state, req) = if self.best_queued_number.is_zero() { debug!( - target:"sync", - "New peer with best hash {} ({}).", - best_hash, - best_number, + target:LOG_TARGET, + "New peer with best hash {best_hash} ({best_number}).", ); (PeerSyncState::Available, None) @@ -616,7 +617,7 @@ where let common_best = std::cmp::min(self.best_queued_number, best_number); debug!( - target:"sync", + target:LOG_TARGET, "New peer with unknown best hash {} ({}), searching for common ancestor.", best_hash, best_number @@ -647,7 +648,7 @@ where if let SyncMode::Warp = self.mode { if self.peers.len() >= MIN_PEERS_TO_START_WARP_SYNC && self.warp_sync.is_none() { - log::debug!(target: "sync", "Starting warp state sync."); + log::debug!(target: LOG_TARGET, "Starting warp state sync."); if let Some(config) = self.warp_sync_config.take() { let mut warp_sync = WarpSync::new(self.client.clone(), config); @@ -664,10 +665,8 @@ where Ok(BlockStatus::InChainWithState) | Ok(BlockStatus::InChainPruned) => { debug!( - target: "sync", - "New peer with known best hash {} ({}).", - best_hash, - best_number, + target: LOG_TARGET, + "New peer with known best hash {best_hash} ({best_number}).", ); self.peers.insert( who, @@ -716,21 +715,23 @@ where .collect(); debug!( - target: "sync", - "Explicit sync request for block {:?} with no peers specified. \ - Syncing from these peers {:?} instead.", - hash, peers, + target: LOG_TARGET, + "Explicit sync request for block {hash:?} with no peers specified. \ + Syncing from these peers {peers:?} instead.", ); } else { - debug!(target: "sync", "Explicit sync request for block {:?} with {:?}", hash, peers); + debug!( + target: LOG_TARGET, + "Explicit sync request for block {hash:?} with {peers:?}", + ); } if self.is_known(hash) { - debug!(target: "sync", "Refusing to sync known hash {:?}", hash); + debug!(target: LOG_TARGET, "Refusing to sync known hash {hash:?}"); return } - trace!(target: "sync", "Downloading requested old fork {:?}", hash); + trace!(target: LOG_TARGET, "Downloading requested old fork {hash:?}"); for peer_id in &peers { if let Some(peer) = self.peers.get_mut(peer_id) { if let PeerSyncState::AncestorSearch { .. } = peer.state { @@ -763,7 +764,7 @@ where let new_blocks: Vec> = if let Some(peer) = self.peers.get_mut(who) { let mut blocks = response.blocks; if request.as_ref().map_or(false, |r| r.direction == Direction::Descending) { - trace!(target: "sync", "Reversing incoming block list"); + trace!(target: LOG_TARGET, "Reversing incoming block list"); blocks.reverse() } self.allowed_requests.add(who); @@ -814,17 +815,22 @@ where } }) .collect(); - debug!(target: "sync", "Drained {} gap blocks from {}", blocks.len(), gap_sync.best_queued_number); + debug!( + target: LOG_TARGET, + "Drained {} gap blocks from {}", + blocks.len(), + gap_sync.best_queued_number, + ); blocks } else { - debug!(target: "sync", "Unexpected gap block response from {}", who); + debug!(target: LOG_TARGET, "Unexpected gap block response from {who}"); return Err(BadPeer(*who, rep::NO_BLOCK)) } }, PeerSyncState::DownloadingStale(_) => { peer.state = PeerSyncState::Available; if blocks.is_empty() { - debug!(target: "sync", "Empty block response from {}", who); + debug!(target: LOG_TARGET, "Empty block response from {who}"); return Err(BadPeer(*who, rep::NO_BLOCK)) } validate_blocks::(&blocks, who, Some(request))?; @@ -853,7 +859,7 @@ where let matching_hash = match (blocks.get(0), self.client.hash(*current)) { (Some(block), Ok(maybe_our_block_hash)) => { trace!( - target: "sync", + target: LOG_TARGET, "Got ancestry block #{} ({}) from peer {}", current, block.hash, @@ -863,17 +869,15 @@ where }, (None, _) => { debug!( - target: "sync", - "Invalid response when searching for ancestor from {}", - who, + target: LOG_TARGET, + "Invalid response when searching for ancestor from {who}", ); return Err(BadPeer(*who, rep::UNKNOWN_ANCESTOR)) }, (_, Err(e)) => { info!( - target: "sync", - "❌ Error answering legitimate blockchain query: {}", - e, + target: LOG_TARGET, + "❌ Error answering legitimate blockchain query: {e}", ); return Err(BadPeer(*who, rep::BLOCKCHAIN_READ_ERROR)) }, @@ -891,7 +895,10 @@ where } } if matching_hash.is_none() && current.is_zero() { - trace!(target:"sync", "Ancestry search: genesis mismatch for peer {}", who); + trace!( + target:LOG_TARGET, + "Ancestry search: genesis mismatch for peer {who}", + ); return Err(BadPeer(*who, rep::GENESIS_MISMATCH)) } if let Some((next_state, next_num)) = @@ -907,7 +914,7 @@ where // Ancestry search is complete. Check if peer is on a stale fork unknown // to us and add it to sync targets if necessary. trace!( - target: "sync", + target: LOG_TARGET, "Ancestry search complete. Ours={} ({}), Theirs={} ({}), Common={:?} ({})", self.best_queued_hash, self.best_queued_number, @@ -920,7 +927,7 @@ where peer.best_number < self.best_queued_number { trace!( - target: "sync", + target: LOG_TARGET, "Added fork target {} for {}", peer.best_hash, who, @@ -953,11 +960,11 @@ where return Err(BadPeer(*who, rep::VERIFICATION_FAIL)), } } else if blocks.is_empty() { - debug!(target: "sync", "Empty block response from {}", who); + debug!(target: LOG_TARGET, "Empty block response from {who}"); return Err(BadPeer(*who, rep::NO_BLOCK)) } else { debug!( - target: "sync", + target: LOG_TARGET, "Too many blocks ({}) in warp target block response from {}", blocks.len(), who, @@ -966,7 +973,7 @@ where } } else { debug!( - target: "sync", + target: LOG_TARGET, "Logic error: we think we are downloading warp target block from {}, but no warp sync is happening.", who, ); @@ -1018,7 +1025,10 @@ where let peer = if let Some(peer) = self.peers.get_mut(&who) { peer } else { - error!(target: "sync", "💔 Called on_block_justification with a peer ID of an unknown peer"); + error!( + target: LOG_TARGET, + "💔 Called on_block_justification with a peer ID of an unknown peer", + ); return Ok(OnBlockJustification::Nothing) }; @@ -1030,7 +1040,7 @@ where let justification = if let Some(block) = response.blocks.into_iter().next() { if hash != block.hash { warn!( - target: "sync", + target: LOG_TARGET, "💔 Invalid block justification provided by {}: requested: {:?} got: {:?}", who, hash, @@ -1046,10 +1056,8 @@ where // we might have asked the peer for a justification on a block that we assumed it // had but didn't (regardless of whether it had a justification for it or not). trace!( - target: "sync", - "Peer {:?} provided empty response for justification request {:?}", - who, - hash, + target: LOG_TARGET, + "Peer {who:?} provided empty response for justification request {hash:?}", ); None @@ -1087,10 +1095,8 @@ where if number + STATE_SYNC_FINALITY_THRESHOLD.saturated_into() >= median { if let Ok(Some(header)) = self.client.header(*hash) { log::debug!( - target: "sync", - "Starting state sync for #{} ({})", - number, - hash, + target: LOG_TARGET, + "Starting state sync for #{number} ({hash})", ); self.state_sync = Some(StateSync::new( self.client.clone(), @@ -1107,9 +1113,8 @@ where if let Err(err) = r { warn!( - target: "sync", - "💔 Error cleaning up pending extra justification data requests: {}", - err, + target: LOG_TARGET, + "💔 Error cleaning up pending extra justification data requests: {err}", ); } } @@ -1124,7 +1129,7 @@ where let header = &announce.header; let number = *header.number(); debug!( - target: "sync", + target: LOG_TARGET, "Pre-validating received block announcement {:?} with number {:?} from {}", hash, number, @@ -1135,10 +1140,8 @@ where self.block_announce_validation.push( async move { warn!( - target: "sync", - "💔 Ignored genesis block (#0) announcement from {}: {}", - who, - hash, + target: LOG_TARGET, + "💔 Ignored genesis block (#0) announcement from {who}: {hash}", ); PreValidateBlockAnnounce::Skip } @@ -1154,7 +1157,7 @@ where self.block_announce_validation.push( async move { warn!( - target: "sync", + target: LOG_TARGET, "💔 Ignored block (#{} -- {}) announcement from {} because all validation slots are occupied.", number, hash, @@ -1169,7 +1172,7 @@ where HasSlotForBlockAnnounceValidation::MaximumPeerSlotsReached => { self.block_announce_validation.push(async move { warn!( - target: "sync", + target: LOG_TARGET, "💔 Ignored block (#{} -- {}) announcement from {} because all validation slots for this peer are occupied.", number, hash, @@ -1195,19 +1198,15 @@ where }, Ok(Validation::Failure { disconnect }) => { debug!( - target: "sync", - "Block announcement validation of block {:?} from {} failed", - hash, - who, + target: LOG_TARGET, + "Block announcement validation of block {hash:?} from {who} failed", ); PreValidateBlockAnnounce::Failure { who, disconnect } }, Err(e) => { debug!( - target: "sync", - "💔 Block announcement validation of block {:?} errored: {}", - hash, - e, + target: LOG_TARGET, + "💔 Block announcement validation of block {hash:?} errored: {e}", ); PreValidateBlockAnnounce::Error { who } }, @@ -1368,9 +1367,8 @@ where }, Err(err) => { log::warn!( - target: "sync", - "Failed to encode block request {:?}: {:?}", - opaque_req, err + target: LOG_TARGET, + "Failed to encode block request {opaque_req:?}: {err:?}", ); }, } @@ -1457,7 +1455,10 @@ where match SyncingMetrics::register(r) { Ok(metrics) => Some(metrics), Err(err) => { - error!(target: "sync", "Failed to register metrics for ChainSync: {err:?}"); + error!( + target: LOG_TARGET, + "Failed to register metrics for ChainSync: {err:?}", + ); None }, } @@ -1514,7 +1515,7 @@ where new_blocks.retain(|b| !self.queue_blocks.contains(&b.hash)); if new_blocks.len() != orig_len { debug!( - target: "sync", + target: LOG_TARGET, "Ignoring {} blocks that are already queued", orig_len - new_blocks.len(), ); @@ -1531,7 +1532,7 @@ where .and_then(|b| b.header.as_ref().map(|h| (&b.hash, *h.number()))) { trace!( - target:"sync", + target:LOG_TARGET, "Accepted {} blocks ({:?}) with origin {:?}", new_blocks.len(), h, @@ -1555,7 +1556,7 @@ where /// through all peers to update our view of their state as well. fn on_block_queued(&mut self, hash: &B::Hash, number: NumberFor) { if self.fork_targets.remove(hash).is_some() { - trace!(target: "sync", "Completed fork sync {:?}", hash); + trace!(target: LOG_TARGET, "Completed fork sync {hash:?}"); } if let Some(gap_sync) = &mut self.gap_sync { if number > gap_sync.best_queued_number && number <= gap_sync.target { @@ -1574,7 +1575,7 @@ where let new_common_number = if peer.best_number >= number { number } else { peer.best_number }; trace!( - target: "sync", + target: LOG_TARGET, "Updating peer {} info, ours={}, common={}->{}, their best={}", n, number, @@ -1639,7 +1640,7 @@ where match self.block_announce_validation_per_peer_stats.entry(*peer) { Entry::Vacant(_) => { error!( - target: "sync", + target: LOG_TARGET, "💔 Block announcement validation from peer {} finished for that no slot was allocated!", peer, ); @@ -1661,10 +1662,8 @@ where let (announce, is_best, who) = match pre_validation_result { PreValidateBlockAnnounce::Failure { who, disconnect } => { debug!( - target: "sync", - "Failed announce validation: {:?}, disconnect: {}", - who, - disconnect, + target: LOG_TARGET, + "Failed announce validation: {who:?}, disconnect: {disconnect}", ); return PollBlockAnnounceValidation::Failure { who, disconnect } }, @@ -1672,7 +1671,7 @@ where (announce, is_new_best, who), PreValidateBlockAnnounce::Error { .. } | PreValidateBlockAnnounce::Skip => { debug!( - target: "sync", + target: LOG_TARGET, "Ignored announce validation", ); return PollBlockAnnounceValidation::Skip @@ -1680,7 +1679,7 @@ where }; trace!( - target: "sync", + target: LOG_TARGET, "Finished block announce validation: from {:?}: {:?}. local_best={}", who, announce.summary(), @@ -1698,12 +1697,12 @@ where let peer = if let Some(peer) = self.peers.get_mut(&who) { peer } else { - error!(target: "sync", "💔 Called on_block_announce with a bad peer ID"); + error!(target: LOG_TARGET, "💔 Called on_block_announce with a bad peer ID"); return PollBlockAnnounceValidation::Nothing { is_best, who, announce } }; if let PeerSyncState::AncestorSearch { .. } = peer.state { - trace!(target: "sync", "Peer state is ancestor search."); + trace!(target: LOG_TARGET, "Peer state is ancestor search."); return PollBlockAnnounceValidation::Nothing { is_best, who, announce } } @@ -1728,7 +1727,7 @@ where // known block case if known || self.is_already_downloading(&hash) { - trace!(target: "sync", "Known block announce from {}: {}", who, hash); + trace!(target: LOG_TARGET, "Known block announce from {who}: {hash}"); if let Some(target) = self.fork_targets.get_mut(&hash) { target.peers.insert(who); } @@ -1737,7 +1736,7 @@ where if ancient_parent { trace!( - target: "sync", + target: LOG_TARGET, "Ignored ancient block announced from {}: {} {:?}", who, hash, @@ -1748,7 +1747,7 @@ where if self.status().state == SyncState::Idle { trace!( - target: "sync", + target: LOG_TARGET, "Added sync target for block announced from {}: {} {:?}", who, hash, @@ -1774,10 +1773,15 @@ where fn restart(&mut self) -> impl Iterator), BadPeer>> + '_ { self.blocks.clear(); if let Err(e) = self.reset_sync_start_point() { - warn!(target: "sync", "💔 Unable to restart sync: {}", e); + warn!(target: LOG_TARGET, "💔 Unable to restart sync: {e}"); } self.allowed_requests.set_all(); - debug!(target:"sync", "Restarted with {} ({})", self.best_queued_number, self.best_queued_hash); + debug!( + target: LOG_TARGET, + "Restarted with {} ({})", + self.best_queued_number, + self.best_queued_hash, + ); let old_peers = std::mem::take(&mut self.peers); old_peers.into_iter().filter_map(move |(id, mut p)| { @@ -1808,14 +1812,14 @@ where let info = self.client.info(); if matches!(self.mode, SyncMode::LightState { .. }) && info.finalized_state.is_some() { warn!( - target: "sync", + target: LOG_TARGET, "Can't use fast sync mode with a partially synced database. Reverting to full sync mode." ); self.mode = SyncMode::Full; } if matches!(self.mode, SyncMode::Warp) && info.finalized_state.is_some() { warn!( - target: "sync", + target: LOG_TARGET, "Can't use warp sync mode with a partially synced database. Reverting to full sync mode." ); self.mode = SyncMode::Full; @@ -1830,25 +1834,30 @@ where self.import_existing = true; // Latest state is missing, start with the last finalized state or genesis instead. if let Some((hash, number)) = info.finalized_state { - debug!(target: "sync", "Starting from finalized state #{}", number); + debug!(target: LOG_TARGET, "Starting from finalized state #{number}"); self.best_queued_hash = hash; self.best_queued_number = number; } else { - debug!(target: "sync", "Restarting from genesis"); + debug!(target: LOG_TARGET, "Restarting from genesis"); self.best_queued_hash = Default::default(); self.best_queued_number = Zero::zero(); } } if let Some((start, end)) = info.block_gap { - debug!(target: "sync", "Starting gap sync #{} - #{}", start, end); + debug!(target: LOG_TARGET, "Starting gap sync #{start} - #{end}"); self.gap_sync = Some(GapSync { best_queued_number: start - One::one(), target: end, blocks: BlockCollection::new(), }); } - trace!(target: "sync", "Restarted sync at #{} ({:?})", self.best_queued_number, self.best_queued_hash); + trace!( + target: LOG_TARGET, + "Restarted sync at #{} ({:?})", + self.best_queued_number, + self.best_queued_hash, + ); Ok(()) } @@ -1925,7 +1934,7 @@ where // Find a random peer that has a block with the target number. for (id, peer) in self.peers.iter_mut() { if peer.state.is_available() && peer.best_number >= target_number { - trace!(target: "sync", "New warp target block request for {}", id); + trace!(target: LOG_TARGET, "New warp target block request for {id}"); peer.state = PeerSyncState::DownloadingWarpTargetBlock; self.allowed_requests.clear(); return Some((*id, request)) @@ -2016,9 +2025,8 @@ where }, Err(err) => { log::warn!( - target: "sync", - "Failed to encode state request {:?}: {:?}", - request, err + target: LOG_TARGET, + "Failed to encode state request {request:?}: {err:?}", ); }, } @@ -2042,9 +2050,8 @@ where ), None => { log::warn!( - target: "sync", - "Trying to send warp sync request when no protocol is configured {:?}", - request, + target: LOG_TARGET, + "Trying to send warp sync request when no protocol is configured {request:?}", ); }, } @@ -2059,7 +2066,12 @@ where let blocks = match self.block_response_into_blocks(&request, response) { Ok(blocks) => blocks, Err(err) => { - debug!(target: "sync", "Failed to decode block response from {}: {}", peer_id, err); + debug!( + target: LOG_TARGET, + "Failed to decode block response from {}: {}", + peer_id, + err, + ); self.network_service.report_peer(peer_id, rep::BAD_MESSAGE); return None }, @@ -2079,7 +2091,7 @@ where _ => Default::default(), }; trace!( - target: "sync", + target: LOG_TARGET, "BlockResponse {} from {} with {} blocks {}", block_response.id, peer_id, @@ -2188,10 +2200,8 @@ where Ok(proto) => proto, Err(e) => { debug!( - target: "sync", - "Failed to decode block response from peer {:?}: {:?}.", - id, - e + target: LOG_TARGET, + "Failed to decode block response from peer {id:?}: {e:?}.", ); self.network_service.report_peer(id, rep::BAD_MESSAGE); self.network_service @@ -2209,10 +2219,8 @@ where Ok(proto) => proto, Err(e) => { debug!( - target: "sync", - "Failed to decode state response from peer {:?}: {:?}.", - id, - e + target: LOG_TARGET, + "Failed to decode state response from peer {id:?}: {e:?}.", ); self.network_service.report_peer(id, rep::BAD_MESSAGE); self.network_service @@ -2230,7 +2238,7 @@ where }, }, Ok(Err(e)) => { - debug!(target: "sync", "Request to peer {:?} failed: {:?}.", id, e); + debug!(target: LOG_TARGET, "Request to peer {id:?} failed: {e:?}."); match e { RequestFailure::Network(OutboundFailure::Timeout) => { @@ -2271,9 +2279,8 @@ where }, Err(oneshot::Canceled) => { trace!( - target: "sync", - "Request to peer {:?} failed due to oneshot being canceled.", - id, + target: LOG_TARGET, + "Request to peer {id:?} failed due to oneshot being canceled.", ); self.network_service .disconnect_peer(id, self.block_announce_protocol_name.clone()); @@ -2358,7 +2365,7 @@ where } if self.queue_blocks.len() > MAX_IMPORTING_BLOCKS { - trace!(target: "sync", "Too many blocks in the queue."); + trace!(target: LOG_TARGET, "Too many blocks in the queue."); return Vec::new() } let is_major_syncing = self.status().state.is_major_syncing(); @@ -2393,7 +2400,7 @@ where queue.len() <= MAJOR_SYNC_BLOCKS.into() { trace!( - target: "sync", + target: LOG_TARGET, "Peer {:?} common block {} too far behind of our best {}. Starting ancestry search.", id, peer.common_number, @@ -2418,7 +2425,7 @@ where ) { peer.state = PeerSyncState::DownloadingNew(range.start); trace!( - target: "sync", + target: LOG_TARGET, "New block request for {}, (best:{}, common:{}) {:?}", id, peer.best_number, @@ -2441,7 +2448,7 @@ where }, max_blocks_per_request, ) { - trace!(target: "sync", "Downloading fork {:?} from {}", hash, id); + trace!(target: LOG_TARGET, "Downloading fork {hash:?} from {id}"); peer.state = PeerSyncState::DownloadingStale(hash); Some((id, req)) } else if let Some((range, req)) = gap_sync.as_mut().and_then(|sync| { @@ -2457,7 +2464,7 @@ where }) { peer.state = PeerSyncState::DownloadingGap(range.start); trace!( - target: "sync", + target: LOG_TARGET, "New gap block request for {}, (best:{}, common:{}) {:?}", id, peer.best_number, @@ -2492,7 +2499,7 @@ where if peer.state.is_available() && peer.common_number >= sync.target_block_num() { peer.state = PeerSyncState::DownloadingState; let request = sync.next_request(); - trace!(target: "sync", "New StateRequest for {}: {:?}", id, request); + trace!(target: LOG_TARGET, "New StateRequest for {}: {:?}", id, request); self.allowed_requests.clear(); return Some((*id, OpaqueStateRequest(Box::new(request)))) } @@ -2507,7 +2514,7 @@ where { for (id, peer) in self.peers.iter_mut() { if peer.state.is_available() && peer.best_number >= target { - trace!(target: "sync", "New StateRequest for {}: {:?}", id, request); + trace!(target: LOG_TARGET, "New StateRequest for {id}: {request:?}"); peer.state = PeerSyncState::DownloadingState; self.allowed_requests.clear(); return Some((*id, OpaqueStateRequest(Box::new(request)))) @@ -2537,7 +2544,7 @@ where // Find a random peer that is synced as much as peer majority. for (id, peer) in self.peers.iter_mut() { if peer.state.is_available() && peer.best_number >= median { - trace!(target: "sync", "New WarpProofRequest for {}", id); + trace!(target: LOG_TARGET, "New WarpProofRequest for {id}"); peer.state = PeerSyncState::DownloadingWarpProof; self.allowed_requests.clear(); return Some((*id, request)) @@ -2556,7 +2563,7 @@ where ) -> Result, BadPeer> { let response: Box = response.0.downcast().map_err(|_error| { error!( - target: "sync", + target: LOG_TARGET, "Failed to downcast opaque state response, this is an implementation bug." ); @@ -2571,7 +2578,7 @@ where } let import_result = if let Some(sync) = &mut self.state_sync { debug!( - target: "sync", + target: LOG_TARGET, "Importing state data from {} with {} keys, {} proof nodes.", who, response.entries.len(), @@ -2580,7 +2587,7 @@ where sync.import(*response) } else if let Some(sync) = &mut self.warp_sync { debug!( - target: "sync", + target: LOG_TARGET, "Importing state data from {} with {} keys, {} proof nodes.", who, response.entries.len(), @@ -2588,7 +2595,7 @@ where ); sync.import_state(*response) } else { - debug!(target: "sync", "Ignored obsolete state response from {}", who); + debug!(target: LOG_TARGET, "Ignored obsolete state response from {who}"); return Err(BadPeer(*who, rep::NOT_REQUESTED)) }; @@ -2607,12 +2614,12 @@ where skip_execution: self.skip_execution(), state: Some(state), }; - debug!(target: "sync", "State download is complete. Import is queued"); + debug!(target: LOG_TARGET, "State download is complete. Import is queued"); Ok(OnStateData::Import(origin, block)) }, state::ImportResult::Continue => Ok(OnStateData::Continue), state::ImportResult::BadResponse => { - debug!(target: "sync", "Bad state data received from {}", who); + debug!(target: LOG_TARGET, "Bad state data received from {who}"); Err(BadPeer(*who, rep::BAD_BLOCK)) }, } @@ -2627,21 +2634,21 @@ where } let import_result = if let Some(sync) = &mut self.warp_sync { debug!( - target: "sync", + target: LOG_TARGET, "Importing warp proof data from {}, {} bytes.", who, response.0.len(), ); sync.import_warp_proof(response) } else { - debug!(target: "sync", "Ignored obsolete warp sync response from {}", who); + debug!(target: LOG_TARGET, "Ignored obsolete warp sync response from {who}"); return Err(BadPeer(*who, rep::NOT_REQUESTED)) }; match import_result { WarpProofImportResult::Success => Ok(()), WarpProofImportResult::BadResponse => { - debug!(target: "sync", "Bad proof data received from {}", who); + debug!(target: LOG_TARGET, "Bad proof data received from {who}"); Err(BadPeer(*who, rep::BAD_BLOCK)) }, } @@ -2679,7 +2686,7 @@ where count: usize, results: Vec<(Result>, BlockImportError>, B::Hash)>, ) -> Box), BadPeer>>> { - trace!(target: "sync", "Imported {} of {}", imported, count); + trace!(target: LOG_TARGET, "Imported {imported} of {count}"); let mut output = Vec::new(); @@ -2706,7 +2713,7 @@ where Ok(BlockImportStatus::ImportedUnknown(number, aux, who)) => { if aux.clear_justification_requests { trace!( - target: "sync", + target: LOG_TARGET, "Block imported clears all pending justification requests {number}: {hash:?}", ); self.clear_justification_requests(); @@ -2714,7 +2721,7 @@ where if aux.needs_justification { trace!( - target: "sync", + target: LOG_TARGET, "Block imported but requires justification {number}: {hash:?}", ); self.request_justification(&hash, number); @@ -2734,7 +2741,7 @@ where self.state_sync.as_ref().map_or(false, |s| s.target() == hash); if state_sync_complete { info!( - target: "sync", + target: LOG_TARGET, "State sync is complete ({} MiB), restarting block sync.", self.state_sync.as_ref().map_or(0, |s| s.progress().size / (1024 * 1024)), ); @@ -2748,7 +2755,7 @@ where .map_or(false, |s| s.target_block_hash() == Some(hash)); if warp_sync_complete { info!( - target: "sync", + target: LOG_TARGET, "Warp sync is complete ({} MiB), restarting block sync.", self.warp_sync.as_ref().map_or(0, |s| s.progress().total_bytes / (1024 * 1024)), ); @@ -2760,7 +2767,7 @@ where self.gap_sync.as_ref().map_or(false, |s| s.target == number); if gap_sync_complete { info!( - target: "sync", + target: LOG_TARGET, "Block history download is complete." ); self.gap_sync = None; @@ -2769,7 +2776,7 @@ where Err(BlockImportError::IncompleteHeader(who)) => if let Some(peer) = who { warn!( - target: "sync", + target: LOG_TARGET, "💔 Peer sent block with incomplete header to import", ); output.push(Err(BadPeer(peer, rep::INCOMPLETE_HEADER))); @@ -2780,7 +2787,7 @@ where who.map_or_else(|| "".into(), |peer| format!(" received from ({peer})")); warn!( - target: "sync", + target: LOG_TARGET, "💔 Verification failed for block {hash:?}{extra_message}: {e:?}", ); @@ -2793,7 +2800,7 @@ where Err(BlockImportError::BadBlock(who)) => if let Some(peer) = who { warn!( - target: "sync", + target: LOG_TARGET, "💔 Block {hash:?} received from peer {peer} has been blacklisted", ); output.push(Err(BadPeer(peer, rep::BAD_BLOCK))); @@ -2802,10 +2809,10 @@ where // This may happen if the chain we were requesting upon has been discarded // in the meantime because other chain has been finalized. // Don't mark it as bad as it still may be synced if explicitly requested. - trace!(target: "sync", "Obsolete block {hash:?}"); + trace!(target: LOG_TARGET, "Obsolete block {hash:?}"); }, e @ Err(BlockImportError::UnknownParent) | e @ Err(BlockImportError::Other(_)) => { - warn!(target: "sync", "💔 Error importing block {hash:?}: {}", e.unwrap_err()); + warn!(target: LOG_TARGET, "💔 Error importing block {hash:?}: {}", e.unwrap_err()); self.state_sync = None; self.warp_sync = None; output.extend(self.restart()); @@ -2925,7 +2932,7 @@ fn peer_block_request( return None } else if peer.common_number < finalized { trace!( - target: "sync", + target: LOG_TARGET, "Requesting pre-finalized chain from {:?}, common={}, finalized={}, peer best={}, our best={}", id, peer.common_number, finalized, peer.best_number, best_num, ); @@ -3004,11 +3011,21 @@ fn fork_sync_request( ) -> Option<(B::Hash, BlockRequest)> { targets.retain(|hash, r| { if r.number <= finalized { - trace!(target: "sync", "Removed expired fork sync request {:?} (#{})", hash, r.number); + trace!( + target: LOG_TARGET, + "Removed expired fork sync request {:?} (#{})", + hash, + r.number, + ); return false } if check_block(hash) != BlockStatus::Unknown { - trace!(target: "sync", "Removed obsolete fork sync request {:?} (#{})", hash, r.number); + trace!( + target: LOG_TARGET, + "Removed obsolete fork sync request {:?} (#{})", + hash, + r.number, + ); return false } true @@ -3029,7 +3046,10 @@ fn fork_sync_request( // request only single block 1 }; - trace!(target: "sync", "Downloading requested fork {:?} from {}, {} blocks", hash, id, count); + trace!( + target: LOG_TARGET, + "Downloading requested fork {hash:?} from {id}, {count} blocks", + ); return Some(( *hash, BlockRequest:: { @@ -3041,7 +3061,7 @@ fn fork_sync_request( }, )) } else { - trace!(target: "sync", "Fork too far in the future: {:?} (#{})", hash, r.number); + trace!(target: LOG_TARGET, "Fork too far in the future: {:?} (#{})", hash, r.number); } } None @@ -3078,7 +3098,7 @@ fn validate_blocks( if let Some(request) = request { if Some(blocks.len() as _) > request.max { debug!( - target: "sync", + target: LOG_TARGET, "Received more blocks than requested from {}. Expected in maximum {:?}, got {}.", who, request.max, @@ -3099,7 +3119,7 @@ fn validate_blocks( if !expected_block { debug!( - target: "sync", + target: LOG_TARGET, "Received block that was not requested. Requested {:?}, got {:?}.", request.from, block_header, @@ -3112,9 +3132,8 @@ fn validate_blocks( blocks.iter().any(|b| b.header.is_none()) { trace!( - target: "sync", - "Missing requested header for a block in response from {}.", - who, + target: LOG_TARGET, + "Missing requested header for a block in response from {who}.", ); return Err(BadPeer(*who, rep::BAD_RESPONSE)) @@ -3123,9 +3142,8 @@ fn validate_blocks( if request.fields.contains(BlockAttributes::BODY) && blocks.iter().any(|b| b.body.is_none()) { trace!( - target: "sync", - "Missing requested body for a block in response from {}.", - who, + target: LOG_TARGET, + "Missing requested body for a block in response from {who}.", ); return Err(BadPeer(*who, rep::BAD_RESPONSE)) @@ -3137,7 +3155,7 @@ fn validate_blocks( let hash = header.hash(); if hash != b.hash { debug!( - target:"sync", + target:LOG_TARGET, "Bad header received from {}. Expected hash {:?}, got {:?}", who, b.hash, @@ -3154,7 +3172,7 @@ fn validate_blocks( ); if expected != got { debug!( - target:"sync", + target:LOG_TARGET, "Bad extrinsic root for a block {} received from {}. Expected {:?}, got {:?}", b.hash, who, @@ -3403,7 +3421,7 @@ mod test { ) -> BlockRequest { let requests = sync.block_requests(); - log::trace!(target: "sync", "Requests: {:?}", requests); + log::trace!(target: LOG_TARGET, "Requests: {requests:?}"); assert_eq!(1, requests.len()); assert_eq!(*peer, requests[0].0); @@ -3783,7 +3801,7 @@ mod test { break }; - log::trace!(target: "sync", "Request: {:?}", request); + log::trace!(target: LOG_TARGET, "Request: {request:?}"); } // Now request and import the fork. @@ -3926,7 +3944,7 @@ mod test { break }; - log::trace!(target: "sync", "Request: {:?}", request); + log::trace!(target: LOG_TARGET, "Request: {request:?}"); } // Now request and import the fork. From 77432a30e9d9dd29e72cccce8d2d6629b0f62c17 Mon Sep 17 00:00:00 2001 From: Dmitry Markin Date: Thu, 31 Aug 2023 12:29:29 +0300 Subject: [PATCH 4/7] Fuse target block channel --- .../client/network/common/src/sync/warp.rs | 3 +- substrate/client/network/sync/src/engine.rs | 44 +++++++++++-------- 2 files changed, 26 insertions(+), 21 deletions(-) diff --git a/substrate/client/network/common/src/sync/warp.rs b/substrate/client/network/common/src/sync/warp.rs index be69686f1c1d..97a38d3ffb8b 100644 --- a/substrate/client/network/common/src/sync/warp.rs +++ b/substrate/client/network/common/src/sync/warp.rs @@ -15,10 +15,9 @@ // along with Substrate. If not, see . use codec::{Decode, Encode}; -use futures::channel::oneshot; pub use sp_consensus_grandpa::{AuthorityList, SetId}; use sp_runtime::traits::{Block as BlockT, NumberFor}; -use std::{fmt, sync::Arc}; +use std::fmt; /// Scale-encoded warp sync proof response. pub struct EncodedProof(pub Vec); diff --git a/substrate/client/network/sync/src/engine.rs b/substrate/client/network/sync/src/engine.rs index 9a71480cea8e..e43506d9c6f8 100644 --- a/substrate/client/network/sync/src/engine.rs +++ b/substrate/client/network/sync/src/engine.rs @@ -26,7 +26,11 @@ use crate::{ }; use codec::{Decode, Encode}; -use futures::{channel::oneshot, FutureExt, StreamExt}; +use futures::{ + channel::oneshot, + future::{BoxFuture, Fuse}, + FutureExt, StreamExt, +}; use futures_timer::Delay; use libp2p::PeerId; use prometheus_endpoint::{ @@ -249,7 +253,8 @@ pub struct SyncingEngine { boot_node_ids: HashSet, /// A channel to get target block header if we skip over proofs downloading during warp sync. - warp_sync_target_block_header_rx: Option::Header>>, + warp_sync_target_block_header_rx: + Fuse>>, /// Protocol name used for block announcements block_announce_protocol_name: ProtocolName, @@ -359,11 +364,16 @@ where // Split warp sync params into warp sync config and a channel to retreive target block // header. let (warp_sync_config, warp_sync_target_block_header_rx) = - warp_sync_params.map_or((None, None), |p| { - let (config, target_block_rx) = p.split(); + warp_sync_params.map_or((None, None), |params| { + let (config, target_block_rx) = params.split(); (Some(config), target_block_rx) }); + // Make sure polling of the target block channel is a no-op if there is no block to + // retrieve. + let warp_sync_target_block_header_rx = warp_sync_target_block_header_rx + .map_or(futures::future::pending().boxed().fuse(), |rx| rx.boxed().fuse()); + let (chain_sync, block_announce_config) = ChainSync::new( mode, client.clone(), @@ -802,21 +812,17 @@ where // Retreive warp sync target block header just before polling `ChainSync` // to make progress as soon as we receive it. - if let Some(ref mut target_block_header_rx) = self.warp_sync_target_block_header_rx { - match target_block_header_rx.poll_unpin(cx) { - Poll::Ready(Ok(target)) => { - self.chain_sync.set_warp_sync_target_block(target); - self.warp_sync_target_block_header_rx = None; - }, - Poll::Ready(Err(err)) => { - log::error!( - target: LOG_TARGET, - "Failed to get target block for warp sync. Error: {err:?}", - ); - self.warp_sync_target_block_header_rx = None; - }, - Poll::Pending => {}, - } + match self.warp_sync_target_block_header_rx.poll_unpin(cx) { + Poll::Ready(Ok(target)) => { + self.chain_sync.set_warp_sync_target_block(target); + }, + Poll::Ready(Err(err)) => { + log::error!( + target: LOG_TARGET, + "Failed to get target block for warp sync. Error: {err:?}", + ); + }, + Poll::Pending => {}, } // poll `ChainSync` last because of a block announcement was received through the From 89584c9e4eb2f80c67bb65eb950178b9ccb64e73 Mon Sep 17 00:00:00 2001 From: Dmitry Markin Date: Thu, 31 Aug 2023 12:37:52 +0300 Subject: [PATCH 5/7] minor: pass variables inline to log macros --- substrate/client/network/sync/src/engine.rs | 34 +++++++++------------ 1 file changed, 15 insertions(+), 19 deletions(-) diff --git a/substrate/client/network/sync/src/engine.rs b/substrate/client/network/sync/src/engine.rs index e43506d9c6f8..261ff6a40175 100644 --- a/substrate/client/network/sync/src/engine.rs +++ b/substrate/client/network/sync/src/engine.rs @@ -543,8 +543,7 @@ where None => { log::error!( target: LOG_TARGET, - "Received block announce from disconnected peer {}", - who, + "Received block announce from disconnected peer {who}", ); debug_assert!(false); return @@ -570,11 +569,11 @@ where let header = match self.client.header(hash) { Ok(Some(header)) => header, Ok(None) => { - log::warn!(target: LOG_TARGET, "Trying to announce unknown block: {}", hash); + log::warn!(target: LOG_TARGET, "Trying to announce unknown block: {hash}"); return }, Err(e) => { - log::warn!(target: LOG_TARGET, "Error reading block header {}: {}", hash, e); + log::warn!(target: LOG_TARGET, "Error reading block header {hash}: {e}"); return }, }; @@ -585,7 +584,7 @@ where } let is_best = self.client.info().best_hash == hash; - log::debug!(target: LOG_TARGET, "Reannouncing block {:?} is_best: {}", hash, is_best); + log::debug!(target: LOG_TARGET, "Reannouncing block {hash:?} is_best: {is_best}"); let data = data .or_else(|| self.block_announce_data_cache.get(&hash).cloned()) @@ -594,7 +593,7 @@ where for (who, ref mut peer) in self.peers.iter_mut() { let inserted = peer.known_blocks.insert(hash); if inserted { - log::trace!(target: LOG_TARGET, "Announcing block {:?} to {}", hash, who); + log::trace!(target: LOG_TARGET, "Announcing block {hash:?} to {who}"); let message = BlockAnnounce { header: header.clone(), state: if is_best { Some(BlockState::Best) } else { Some(BlockState::Normal) }, @@ -609,7 +608,7 @@ where /// Inform sync about new best imported block. pub fn new_best_block_imported(&mut self, hash: B::Hash, number: NumberFor) { - log::debug!(target: LOG_TARGET, "New best block imported {:?}/#{}", hash, number); + log::debug!(target: LOG_TARGET, "New best block imported {hash:?}/#{number}"); self.chain_sync.update_chain_info(&hash, number); self.network_service.set_notification_handshake( @@ -697,8 +696,7 @@ where if !success { log::info!( target: LOG_TARGET, - "💔 Invalid justification provided by {} for #{}", - peer, hash, + "💔 Invalid justification provided by {peer} for #{hash}", ); self.network_service .disconnect_peer(peer, self.block_announce_protocol_name.clone()); @@ -796,8 +794,7 @@ where } else { log::trace!( target: LOG_TARGET, - "Received sync for peer earlier refused by sync layer: {}", - remote + "Received sync for peer earlier refused by sync layer: {remote}", ); } } @@ -841,9 +838,9 @@ where pub fn on_sync_peer_disconnected(&mut self, peer: PeerId) -> Result<(), ()> { if let Some(info) = self.peers.remove(&peer) { if self.important_peers.contains(&peer) { - log::warn!(target: LOG_TARGET, "Reserved peer {} disconnected", peer); + log::warn!(target: LOG_TARGET, "Reserved peer {peer} disconnected"); } else { - log::debug!(target: LOG_TARGET, "{} disconnected", peer); + log::debug!(target: LOG_TARGET, "{peer} disconnected"); } if !self.default_peers_set_no_slot_connected_peers.remove(&peer) && @@ -884,13 +881,12 @@ where sink: NotificationsSink, inbound: bool, ) -> Result<(), ()> { - log::trace!(target: LOG_TARGET, "New peer {} {:?}", who, status); + log::trace!(target: LOG_TARGET, "New peer {who} {status:?}"); if self.peers.contains_key(&who) { log::error!( target: LOG_TARGET, - "Called on_sync_peer_connected with already connected peer {}", - who, + "Called on_sync_peer_connected with already connected peer {who}", ); debug_assert!(false); return Err(()) @@ -947,7 +943,7 @@ where self.default_peers_set_no_slot_connected_peers.len() + this_peer_reserved_slot { - log::debug!(target: LOG_TARGET, "Too many full nodes, rejecting {}", who); + log::debug!(target: LOG_TARGET, "Too many full nodes, rejecting {who}"); return Err(()) } @@ -955,7 +951,7 @@ where (self.peers.len() - self.chain_sync.num_peers()) >= self.default_peers_set_num_light { // Make sure that not all slots are occupied by light clients. - log::debug!(target: LOG_TARGET, "Too many light nodes, rejecting {}", who); + log::debug!(target: LOG_TARGET, "Too many light nodes, rejecting {who}"); return Err(()) } @@ -984,7 +980,7 @@ where None }; - log::debug!(target: LOG_TARGET, "Connected {}", who); + log::debug!(target: LOG_TARGET, "Connected {who}"); self.peers.insert(who, peer); From 3ef5489ef2fa9127b142678a25be48675662ab5c Mon Sep 17 00:00:00 2001 From: Dmitry Markin Date: Mon, 4 Sep 2023 18:59:34 +0300 Subject: [PATCH 6/7] Fix merge issues --- substrate/client/network/sync/src/engine.rs | 7 +++---- 1 file changed, 3 insertions(+), 4 deletions(-) diff --git a/substrate/client/network/sync/src/engine.rs b/substrate/client/network/sync/src/engine.rs index 2cfbc5ba2b69..23847d16c972 100644 --- a/substrate/client/network/sync/src/engine.rs +++ b/substrate/client/network/sync/src/engine.rs @@ -52,7 +52,6 @@ use sc_network_common::{ role::Roles, sync::{ message::{BlockAnnounce, BlockAnnouncesHandshake, BlockState}, - warp::WarpSyncParams, BadPeer, ChainSync as ChainSyncT, ExtendedPeerInfo, SyncEvent, }, }; @@ -835,8 +834,8 @@ where /// /// Returns a result if the handshake of this peer was indeed accepted. pub fn on_sync_peer_disconnected(&mut self, peer_id: PeerId) -> Result<(), ()> { - if let Some(info) = self.peers.remove(&peer) { - if self.important_peers.contains(&peer) { + if let Some(info) = self.peers.remove(&peer_id) { + if self.important_peers.contains(&peer_id) { log::warn!(target: LOG_TARGET, "Reserved peer {peer_id} disconnected"); } else { log::debug!(target: LOG_TARGET, "{peer_id} disconnected"); @@ -883,7 +882,7 @@ where ) -> Result<(), ()> { log::trace!(target: LOG_TARGET, "New peer {peer_id} {status:?}"); - if self.peers.contains_key(&who) { + if self.peers.contains_key(&peer_id) { log::error!( target: LOG_TARGET, "Called on_sync_peer_connected with already connected peer {peer_id}", From 6faccdae532934b6ae40df2efca387b44daff55d Mon Sep 17 00:00:00 2001 From: Dmitry Markin Date: Tue, 5 Sep 2023 11:46:39 +0300 Subject: [PATCH 7/7] minor: spelling --- substrate/client/network/sync/src/warp.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/substrate/client/network/sync/src/warp.rs b/substrate/client/network/sync/src/warp.rs index cfbc51123a44..74835a6e015e 100644 --- a/substrate/client/network/sync/src/warp.rs +++ b/substrate/client/network/sync/src/warp.rs @@ -144,7 +144,7 @@ where let Phase::PendingTargetBlock = self.phase else { error!( target: LOG_TARGET, - "Attempt to set warp sync target block in invalide phase.", + "Attempt to set warp sync target block in invalid phase.", ); debug_assert!(false); return