Skip to content
This repository has been archived by the owner on Nov 15, 2023. It is now read-only.

Move block/state/warpc sync requests/responses to ChainSync #12739

Merged
merged 6 commits into from
Nov 22, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 3 additions & 2 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

81 changes: 33 additions & 48 deletions client/network/common/src/sync.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,14 +24,16 @@ pub mod warp;

use libp2p::PeerId;
use message::{BlockAnnounce, BlockData, BlockRequest, BlockResponse};
use sc_consensus::{BlockImportError, BlockImportStatus, IncomingBlock};
use sc_consensus::{
import_queue::RuntimeOrigin, BlockImportError, BlockImportStatus, IncomingBlock,
};
use sp_consensus::BlockOrigin;
use sp_runtime::{
traits::{Block as BlockT, NumberFor},
Justifications,
};
use std::{any::Any, fmt, fmt::Formatter, task::Poll};
use warp::{EncodedProof, WarpProofRequest, WarpSyncProgress};
use warp::WarpSyncProgress;

/// The sync status of a peer we are trying to sync with
#[derive(Debug)]
Expand Down Expand Up @@ -123,7 +125,7 @@ pub enum OnBlockJustification<Block: BlockT> {
},
}

/// Result of [`ChainSync::on_state_data`].
/// Result of `ChainSync::on_state_data`.
#[derive(Debug)]
pub enum OnStateData<Block: BlockT> {
/// The block and state that should be imported.
Expand All @@ -132,6 +134,20 @@ pub enum OnStateData<Block: BlockT> {
Continue,
}

/// Block or justification request polled from `ChainSync`
#[derive(Debug)]
pub enum ImportResult<B: BlockT> {
BlockImport(BlockOrigin, Vec<IncomingBlock<B>>),
JustificationImport(RuntimeOrigin, B::Hash, NumberFor<B>, Justifications),
}

/// Value polled from `ChainSync`
#[derive(Debug)]
pub enum PollResult<B: BlockT> {
Import(ImportResult<B>),
Announce(PollBlockAnnounceValidation<B::Header>),
}

/// Result of [`ChainSync::poll_block_announce_validation`].
#[derive(Debug, Clone, PartialEq, Eq)]
pub enum PollBlockAnnounceValidation<H> {
Expand Down Expand Up @@ -186,6 +202,13 @@ pub struct Metrics {
pub justifications: metrics::Metrics,
}

#[derive(Debug)]
pub enum PeerRequest<B: BlockT> {
Block(BlockRequest<B>),
State,
WarpProof,
}

/// Wrapper for implementation-specific state request.
///
/// NOTE: Implementation must be able to encode and decode it for network purposes.
Expand Down Expand Up @@ -250,6 +273,9 @@ pub trait ChainSync<Block: BlockT>: Send {
/// Returns the current number of peers stored within this state machine.
fn num_peers(&self) -> usize;

/// Returns the number of peers we're connected to and that are being queried.
fn num_active_peers(&self) -> usize;

/// Handle a new connected peer.
///
/// Call this method whenever we connect to a new peer.
Expand Down Expand Up @@ -277,22 +303,6 @@ pub trait ChainSync<Block: BlockT>: Send {
number: NumberFor<Block>,
);

/// Get an iterator over all scheduled justification requests.
fn justification_requests<'a>(
&'a mut self,
) -> Box<dyn Iterator<Item = (PeerId, BlockRequest<Block>)> + 'a>;

/// Get an iterator over all block requests of all peers.
fn block_requests<'a>(
&'a mut self,
) -> Box<dyn Iterator<Item = (PeerId, BlockRequest<Block>)> + 'a>;

/// Get a state request, if any.
fn state_request(&mut self) -> Option<(PeerId, OpaqueStateRequest)>;

/// Get a warp sync request, if any.
fn warp_sync_request(&mut self) -> Option<(PeerId, WarpProofRequest<Block>)>;

/// Handle a response from the remote to a block request that we made.
///
/// `request` must be the original request that triggered `response`.
Expand All @@ -307,16 +317,6 @@ pub trait ChainSync<Block: BlockT>: Send {
response: BlockResponse<Block>,
) -> Result<OnBlockData<Block>, BadPeer>;

/// Handle a response from the remote to a state request that we made.
fn on_state_data(
&mut self,
who: &PeerId,
response: OpaqueStateResponse,
) -> Result<OnStateData<Block>, BadPeer>;

/// Handle a response from the remote to a warp proof request that we made.
fn on_warp_sync_data(&mut self, who: &PeerId, response: EncodedProof) -> Result<(), BadPeer>;

/// Handle a response from the remote to a justification request that we made.
///
/// `request` must be the original request that triggered `response`.
Expand Down Expand Up @@ -383,35 +383,20 @@ pub trait ChainSync<Block: BlockT>: Send {
/// Return some key metrics.
fn metrics(&self) -> Metrics;

/// Create implementation-specific block request.
fn create_opaque_block_request(&self, request: &BlockRequest<Block>) -> OpaqueBlockRequest;

/// Encode implementation-specific block request.
fn encode_block_request(&self, request: &OpaqueBlockRequest) -> Result<Vec<u8>, String>;

/// Decode implementation-specific block response.
fn decode_block_response(&self, response: &[u8]) -> Result<OpaqueBlockResponse, String>;

/// Access blocks from implementation-specific block response.
fn block_response_into_blocks(
&self,
request: &BlockRequest<Block>,
response: OpaqueBlockResponse,
) -> Result<Vec<BlockData<Block>>, String>;

/// Encode implementation-specific state request.
fn encode_state_request(&self, request: &OpaqueStateRequest) -> Result<Vec<u8>, String>;

/// Decode implementation-specific state response.
fn decode_state_response(&self, response: &[u8]) -> Result<OpaqueStateResponse, String>;

/// Advance the state of `ChainSync`
///
/// Internally calls [`ChainSync::poll_block_announce_validation()`] and
/// this function should be polled until it returns [`Poll::Pending`] to
/// consume all pending events.
fn poll(
&mut self,
cx: &mut std::task::Context,
) -> Poll<PollBlockAnnounceValidation<Block::Header>>;
fn poll(&mut self, cx: &mut std::task::Context) -> Poll<PollResult<Block>>;

/// Send block request to peer
fn send_block_request(&mut self, who: PeerId, request: BlockRequest<Block>);
}
51 changes: 1 addition & 50 deletions client/network/src/behaviour.rs
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,6 @@ use sc_network_common::{
ProtocolName,
},
request_responses::{IfDisconnected, ProtocolConfig, RequestFailure},
sync::{warp::WarpProofRequest, OpaqueBlockRequest, OpaqueStateRequest},
};
use sc_peerset::{PeersetHandle, ReputationChange};
use sp_blockchain::HeaderBackend;
Expand Down Expand Up @@ -163,36 +162,6 @@ pub enum BehaviourOut<B: BlockT> {
messages: Vec<(ProtocolName, Bytes)>,
},

/// A new block request must be emitted.
BlockRequest {
/// Node we send the request to.
target: PeerId,
/// Opaque implementation-specific block request.
request: OpaqueBlockRequest,
/// One-shot channel to receive the response.
pending_response: oneshot::Sender<Result<Vec<u8>, RequestFailure>>,
},

/// A new state request must be emitted.
StateRequest {
/// Node we send the request to.
target: PeerId,
/// Opaque implementation-specific state request.
request: OpaqueStateRequest,
/// One-shot channel to receive the response.
pending_response: oneshot::Sender<Result<Vec<u8>, RequestFailure>>,
},

/// A new warp sync request must be emitted.
WarpSyncRequest {
/// Node we send the request to.
target: PeerId,
/// Warp sync request.
request: WarpProofRequest<B>,
/// One-shot channel to receive the response.
pending_response: oneshot::Sender<Result<Vec<u8>, RequestFailure>>,
},

/// Now connected to a new peer for syncing purposes.
SyncConnected(PeerId),

Expand Down Expand Up @@ -230,21 +199,9 @@ where
user_agent: String,
local_public_key: PublicKey,
disco_config: DiscoveryConfig,
block_request_protocol_config: ProtocolConfig,
state_request_protocol_config: ProtocolConfig,
warp_sync_protocol_config: Option<ProtocolConfig>,
light_client_request_protocol_config: ProtocolConfig,
// All remaining request protocol configs.
mut request_response_protocols: Vec<ProtocolConfig>,
request_response_protocols: Vec<ProtocolConfig>,
peerset: PeersetHandle,
) -> Result<Self, request_responses::RegisterError> {
if let Some(config) = warp_sync_protocol_config {
request_response_protocols.push(config);
}
request_response_protocols.push(block_request_protocol_config);
request_response_protocols.push(state_request_protocol_config);
request_response_protocols.push(light_client_request_protocol_config);

Ok(Self {
substrate,
peer_info: peer_info::PeerInfoBehaviour::new(user_agent, local_public_key),
Expand Down Expand Up @@ -356,12 +313,6 @@ impl<B: BlockT> From<CustomMessageOutcome<B>> for BehaviourOut<B> {
BehaviourOut::BlockImport(origin, blocks),
CustomMessageOutcome::JustificationImport(origin, hash, nb, justification) =>
BehaviourOut::JustificationImport(origin, hash, nb, justification),
CustomMessageOutcome::BlockRequest { target, request, pending_response } =>
BehaviourOut::BlockRequest { target, request, pending_response },
CustomMessageOutcome::StateRequest { target, request, pending_response } =>
BehaviourOut::StateRequest { target, request, pending_response },
CustomMessageOutcome::WarpSyncRequest { target, request, pending_response } =>
BehaviourOut::WarpSyncRequest { target, request, pending_response },
CustomMessageOutcome::NotificationStreamOpened {
remote,
protocol,
Expand Down
33 changes: 0 additions & 33 deletions client/network/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -101,39 +101,6 @@ where
/// Block announce protocol configuration
pub block_announce_config: NonDefaultSetConfig,

/// Request response configuration for the block request protocol.
///
/// [`RequestResponseConfig::name`] is used to tag outgoing block requests with the correct
/// protocol name. In addition all of [`RequestResponseConfig`] is used to handle incoming
/// block requests, if enabled.
///
/// Can be constructed either via
/// `sc_network_sync::block_request_handler::generate_protocol_config` allowing outgoing but
/// not incoming requests, or constructed via `sc_network_sync::block_request_handler::
/// BlockRequestHandler::new` allowing both outgoing and incoming requests.
pub block_request_protocol_config: RequestResponseConfig,

/// Request response configuration for the light client request protocol.
///
/// Can be constructed either via
/// `sc_network_light::light_client_requests::generate_protocol_config` allowing outgoing but
/// not incoming requests, or constructed via
/// `sc_network_light::light_client_requests::handler::LightClientRequestHandler::new`
/// allowing both outgoing and incoming requests.
pub light_client_request_protocol_config: RequestResponseConfig,

/// Request response configuration for the state request protocol.
///
/// Can be constructed either via
/// `sc_network_sync::state_request_handler::generate_protocol_config` allowing outgoing but
/// not incoming requests, or constructed via
/// `sc_network_sync::state_request_handler::StateRequestHandler::new` allowing
/// both outgoing and incoming requests.
pub state_request_protocol_config: RequestResponseConfig,

/// Optional warp sync protocol config.
pub warp_sync_protocol_config: Option<RequestResponseConfig>,

/// Request response protocol configurations
pub request_response_protocol_configs: Vec<RequestResponseConfig>,
}
Expand Down
Loading