Skip to content

Commit

Permalink
Support announcing to other peers sync status of local peer and take …
Browse files Browse the repository at this point in the history
…that into account when deciding if node is under major sync or not.

Add pending sync status to distinguish state where blocks are imported, but sync target is not yet identified due to no synced nodes known.
  • Loading branch information
nazar-pc committed Jan 15, 2024
1 parent 65171c0 commit df7768a
Show file tree
Hide file tree
Showing 17 changed files with 151 additions and 41 deletions.
1 change: 1 addition & 0 deletions cumulus/client/relay-chain-minimal-node/src/network.rs
Original file line number Diff line number Diff line change
Expand Up @@ -165,6 +165,7 @@ fn get_block_announce_proto_config<B: BlockT>(
best_number,
best_hash,
genesis_hash,
true,
))),
// NOTE: `set_config` will be ignored by `protocol.rs` as the block announcement
// protocol is still hardcoded into the peerset.
Expand Down
9 changes: 9 additions & 0 deletions substrate/client/cli/src/params/network_params.rs
Original file line number Diff line number Diff line change
Expand Up @@ -171,6 +171,14 @@ pub struct NetworkParams {
/// and observe block requests timing out.
#[arg(long, value_name = "COUNT", default_value_t = 64)]
pub max_blocks_per_request: u32,

/// Parameter that allows node to forcefully assume it is synced, needed for network
/// bootstrapping only, as long as two synced nodes remain on the network at any time, this
/// doesn't need to be used.
///
/// `--dev` enables this option automatically.
#[clap(long)]
pub force_synced: bool,
}

impl NetworkParams {
Expand Down Expand Up @@ -267,6 +275,7 @@ impl NetworkParams {
ipfs_server: self.ipfs_server,
sync_mode: self.sync.into(),
pause_sync: Arc::new(AtomicBool::new(false)),
force_synced: self.force_synced || is_dev,
}
}
}
Expand Down
1 change: 1 addition & 0 deletions substrate/client/informant/src/display.rs
Original file line number Diff line number Diff line change
Expand Up @@ -135,6 +135,7 @@ impl<B: BlockT> InformantDisplay<B> {
(state.size as f32) / (1024f32 * 1024f32)
),
),
(SyncState::Pending, _, _) => ("⏳", "Pending".into(), "".into()),
(SyncState::Idle, _, _) => ("💤", "Idle".into(), "".into()),
(SyncState::Downloading { target }, _, _) =>
("⚙️ ", format!("Syncing{}", speed), format!(", target=#{target}")),
Expand Down
11 changes: 9 additions & 2 deletions substrate/client/network/common/src/sync/message.rs
Original file line number Diff line number Diff line change
Expand Up @@ -190,6 +190,8 @@ pub mod generic {
pub struct BlockAnnounce<H> {
/// New block header.
pub header: H,
/// Whether peer is synced.
pub is_synced: bool,
/// Block state. TODO: Remove `Option` and custom encoding when v4 becomes common.
pub state: Option<BlockState>,
/// Data associated with this block announcement, e.g. a candidate message.
Expand All @@ -202,6 +204,7 @@ pub mod generic {
impl<H: Encode> Encode for BlockAnnounce<H> {
fn encode_to<T: Output + ?Sized>(&self, dest: &mut T) {
self.header.encode_to(dest);
self.is_synced.encode_to(dest);
if let Some(state) = &self.state {
state.encode_to(dest);
}
Expand All @@ -215,8 +218,9 @@ pub mod generic {
fn decode<I: Input>(input: &mut I) -> Result<Self, codec::Error> {
let header = H::decode(input)?;
let state = BlockState::decode(input).ok();
let is_synced = bool::decode(input)?;
let data = Vec::decode(input).ok();
Ok(Self { header, state, data })
Ok(Self { header, is_synced, state, data })
}
}
}
Expand All @@ -232,6 +236,8 @@ pub struct BlockAnnouncesHandshake<B: BlockT> {
pub best_hash: B::Hash,
/// Genesis block hash.
pub genesis_hash: B::Hash,
/// Whether peer is synced.
pub is_synced: bool,
}

impl<B: BlockT> BlockAnnouncesHandshake<B> {
Expand All @@ -240,7 +246,8 @@ impl<B: BlockT> BlockAnnouncesHandshake<B> {
best_number: NumberFor<B>,
best_hash: B::Hash,
genesis_hash: B::Hash,
is_synced: bool,
) -> Self {
Self { genesis_hash, roles, best_number, best_hash }
Self { genesis_hash, roles, best_number, best_hash, is_synced }
}
}
6 changes: 6 additions & 0 deletions substrate/client/network/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -659,6 +659,11 @@ pub struct NetworkConfiguration {
/// a modification of the way the implementation works. Different nodes with different
/// configured values remain compatible with each other.
pub yamux_window_size: Option<u32>,

/// Parameter that allows node to forcefully assume it is synced, needed for network
/// bootstrapping only, as long as two synced nodes remain on the network at any time, this
/// doesn't need to be used.
pub force_synced: bool,
}

impl NetworkConfiguration {
Expand Down Expand Up @@ -692,6 +697,7 @@ impl NetworkConfiguration {
.expect("value is a constant; constant is non-zero; qed."),
yamux_window_size: None,
ipfs_server: false,
force_synced: false,
}
}

Expand Down
6 changes: 6 additions & 0 deletions substrate/client/network/src/protocol/message.rs
Original file line number Diff line number Diff line change
Expand Up @@ -136,6 +136,8 @@ pub mod generic {
pub best_hash: Hash,
/// Genesis block hash.
pub genesis_hash: Hash,
/// Whether peer is synced.
pub is_synced: bool,
}

/// Status sent on connection.
Expand All @@ -153,6 +155,8 @@ pub mod generic {
pub best_hash: Hash,
/// Genesis block hash.
pub genesis_hash: Hash,
/// Whether peer is synced.
pub is_synced: bool,
/// DEPRECATED. Chain-specific status.
pub chain_status: Vec<u8>,
}
Expand All @@ -178,6 +182,7 @@ pub mod generic {
best_number,
best_hash,
genesis_hash,
is_synced,
} = compact;

Ok(Self {
Expand All @@ -187,6 +192,7 @@ pub mod generic {
best_number,
best_hash,
genesis_hash,
is_synced,
chain_status,
})
}
Expand Down
37 changes: 34 additions & 3 deletions substrate/client/network/sync/src/engine.rs
Original file line number Diff line number Diff line change
Expand Up @@ -226,6 +226,11 @@ pub struct SyncingEngine<B: BlockT, Client> {
/// Are we actively catching up with the chain?
is_major_syncing: Arc<AtomicBool>,

/// Parameter that allows node to forcefully assume it is synced, needed for network
/// bootstrapping only, as long as two synced nodes remain on the network at any time, this
/// doesn't need to be used.
force_synced: bool,

/// Network service.
network_service: service::network::NetworkServiceHandle,

Expand Down Expand Up @@ -346,6 +351,7 @@ where
state_request_protocol_name: ProtocolName,
warp_sync_protocol_name: Option<ProtocolName>,
peer_store_handle: PeerStoreHandle,
force_synced: bool,
) -> Result<(Self, SyncingService<B>, NonDefaultSetConfig), ClientError> {
let mode = net_config.network_config.sync_mode;
let pause_sync = Arc::clone(&net_config.network_config.pause_sync);
Expand Down Expand Up @@ -427,6 +433,7 @@ where
.ok()
.flatten()
.expect("Genesis block exists; qed"),
force_synced,
);

// Split warp sync params into warp sync config and a channel to retreive target block
Expand Down Expand Up @@ -483,6 +490,7 @@ where
),
num_connected: num_connected.clone(),
is_major_syncing: is_major_syncing.clone(),
force_synced: net_config.network_config.force_synced,
service_rx,
genesis_hash,
important_peers,
Expand Down Expand Up @@ -522,6 +530,15 @@ where
))
}

fn is_synced(&self) -> bool {
if self.force_synced {
return true
}

!self.is_major_syncing.load(Ordering::Relaxed) &&
self.peers.iter().any(|(_peer_id, peer)| peer.info.is_synced)
}

/// Report Prometheus metrics.
pub fn report_metrics(&self) {
if let Some(metrics) = &self.metrics {
Expand All @@ -536,10 +553,12 @@ where
peer_id: &PeerId,
best_hash: B::Hash,
best_number: NumberFor<B>,
is_synced: bool,
) {
if let Some(ref mut peer) = self.peers.get_mut(peer_id) {
peer.info.best_hash = best_hash;
peer.info.best_number = best_number;
peer.info.is_synced = is_synced;
}
}

Expand All @@ -551,10 +570,10 @@ where
match validation_result {
BlockAnnounceValidationResult::Skip { peer_id: _ } => {},
BlockAnnounceValidationResult::Process { is_new_best, peer_id, announce } => {
if let Some((best_hash, best_number)) =
if let Some((best_hash, best_number, is_synced)) =
self.strategy.on_validated_block_announce(is_new_best, peer_id, &announce)
{
self.update_peer_info(&peer_id, best_hash, best_number);
self.update_peer_info(&peer_id, best_hash, best_number, is_synced);
}

if let Some(data) = announce.data {
Expand Down Expand Up @@ -628,6 +647,7 @@ where
return
}

let is_synced = self.is_synced();
let is_best = self.client.info().best_hash == hash;
log::debug!(target: LOG_TARGET, "Reannouncing block {hash:?} is_best: {is_best}");

Expand All @@ -641,6 +661,7 @@ where
log::trace!(target: LOG_TARGET, "Announcing block {hash:?} to {peer_id}");
let message = BlockAnnounce {
header: header.clone(),
is_synced,
state: if is_best { Some(BlockState::Best) } else { Some(BlockState::Normal) },
data: Some(data.clone()),
};
Expand Down Expand Up @@ -761,6 +782,7 @@ where
*peer_id,
peer.info.best_hash,
peer.info.best_number,
peer.info.is_synced,
))
});
self.strategy.switch_to_next(
Expand Down Expand Up @@ -849,6 +871,7 @@ where
number,
hash,
self.genesis_hash,
self.is_synced(),
)
.encode(),
);
Expand Down Expand Up @@ -1140,6 +1163,7 @@ where
roles: status.roles,
best_hash: status.best_hash,
best_number: status.best_number,
is_synced: status.is_synced,
},
known_blocks: LruHashSet::new(
NonZeroUsize::new(MAX_KNOWN_BLOCKS).expect("Constant is nonzero"),
Expand All @@ -1149,7 +1173,12 @@ where

// Only forward full peers to syncing strategy.
if status.roles.is_full() {
self.strategy.add_peer(peer_id, peer.info.best_hash, peer.info.best_number);
self.strategy.add_peer(
peer_id,
peer.info.best_hash,
peer.info.best_number,
peer.info.is_synced,
);
}

log::debug!(target: LOG_TARGET, "Connected {peer_id}");
Expand Down Expand Up @@ -1383,6 +1412,7 @@ where
best_number: NumberFor<B>,
best_hash: B::Hash,
genesis_hash: B::Hash,
force_synced: bool,
) -> (NonDefaultSetConfig, Box<dyn NotificationService>) {
let block_announces_protocol = {
let genesis_hash = genesis_hash.as_ref();
Expand All @@ -1406,6 +1436,7 @@ where
best_number,
best_hash,
genesis_hash,
force_synced,
))),
// NOTE: `set_config` will be ignored by `protocol.rs` as the block announcement
// protocol is still hardcoded into the peerset.
Expand Down
1 change: 1 addition & 0 deletions substrate/client/network/sync/src/extra_requests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -564,6 +564,7 @@ mod tests {
best_hash: Hash::random(),
best_number: u64::arbitrary(g),
state: ArbitraryPeerSyncState::arbitrary(g).0,
is_synced: bool::arbitrary(g),
};
ArbitraryPeerSync(ps)
}
Expand Down
33 changes: 22 additions & 11 deletions substrate/client/network/sync/src/strategy.rs
Original file line number Diff line number Diff line change
Expand Up @@ -139,14 +139,20 @@ where
}

/// Notify that a new peer has connected.
pub fn add_peer(&mut self, peer_id: PeerId, best_hash: B::Hash, best_number: NumberFor<B>) {
pub fn add_peer(
&mut self,
peer_id: PeerId,
best_hash: B::Hash,
best_number: NumberFor<B>,
is_synced: bool,
) {
match self {
SyncingStrategy::WarpSyncStrategy(strategy) =>
strategy.add_peer(peer_id, best_hash, best_number),
SyncingStrategy::StateSyncStrategy(strategy) =>
strategy.add_peer(peer_id, best_hash, best_number),
SyncingStrategy::ChainSyncStrategy(strategy) =>
strategy.add_peer(peer_id, best_hash, best_number),
strategy.add_peer(peer_id, best_hash, best_number, is_synced),
}
}

Expand All @@ -167,7 +173,7 @@ where
is_best: bool,
peer_id: PeerId,
announce: &BlockAnnounce<B::Header>,
) -> Option<(B::Hash, NumberFor<B>)> {
) -> Option<(B::Hash, NumberFor<B>, bool)> {
match self {
SyncingStrategy::WarpSyncStrategy(strategy) =>
strategy.on_validated_block_announce(is_best, peer_id, announce),
Expand Down Expand Up @@ -404,7 +410,7 @@ where
&mut self,
config: SyncingConfig,
client: Arc<Client>,
connected_peers: impl Iterator<Item = (PeerId, B::Hash, NumberFor<B>)>,
connected_peers: impl Iterator<Item = (PeerId, B::Hash, NumberFor<B>, bool)>,
) -> Result<(), ClientError> {
match self {
Self::WarpSyncStrategy(warp_sync) => {
Expand All @@ -421,8 +427,11 @@ where
res.target_justifications,
// skip proofs, only set to `true` in `FastUnsafe` sync mode
false,
connected_peers
.map(|(peer_id, _best_hash, best_number)| (peer_id, best_number)),
connected_peers.map(
|(peer_id, _best_hash, best_number, _is_synced)| {
(peer_id, best_number)
},
),
);

*self = Self::StateSyncStrategy(state_sync);
Expand All @@ -448,8 +457,8 @@ where
};
// Let `ChainSync` know about connected peers.
connected_peers.into_iter().for_each(
|(peer_id, best_hash, best_number)| {
chain_sync.add_peer(peer_id, best_hash, best_number)
|(peer_id, best_hash, best_number, is_synced)| {
chain_sync.add_peer(peer_id, best_hash, best_number, is_synced)
},
);

Expand Down Expand Up @@ -478,9 +487,11 @@ where
},
};
// Let `ChainSync` know about connected peers.
connected_peers.into_iter().for_each(|(peer_id, best_hash, best_number)| {
chain_sync.add_peer(peer_id, best_hash, best_number)
});
connected_peers.into_iter().for_each(
|(peer_id, best_hash, best_number, is_synced)| {
chain_sync.add_peer(peer_id, best_hash, best_number, is_synced)
},
);

*self = Self::ChainSyncStrategy(chain_sync);
},
Expand Down
Loading

0 comments on commit df7768a

Please sign in to comment.