Skip to content
This repository has been archived by the owner on Nov 6, 2020. It is now read-only.

Snapshot sync improvements #2960

Merged
merged 4 commits into from
Oct 29, 2016
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion sync/src/api.rs
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@ impl Default for SyncConfig {
network_id: U256::from(1),
subprotocol_name: *b"eth",
fork_block: None,
warp_sync: true,
warp_sync: false,
}
}
}
Expand Down
186 changes: 143 additions & 43 deletions sync/src/chain.rs
Original file line number Diff line number Diff line change
Expand Up @@ -123,6 +123,7 @@ const MAX_NEW_BLOCK_AGE: BlockNumber = 20;
const MAX_TRANSACTION_SIZE: usize = 300*1024;
// Min number of blocks to be behind for a snapshot sync
const SNAPSHOT_RESTORE_THRESHOLD: BlockNumber = 100000;
const SNAPSHOT_MIN_PEERS: usize = 3;

const STATUS_PACKET: u8 = 0x00;
const NEW_BLOCK_HASHES_PACKET: u8 = 0x01;
Expand All @@ -147,21 +148,27 @@ const SNAPSHOT_DATA_PACKET: u8 = 0x14;

pub const SNAPSHOT_SYNC_PACKET_COUNT: u8 = 0x15;

const HEADERS_TIMEOUT_SEC: f64 = 15f64;
const BODIES_TIMEOUT_SEC: f64 = 10f64;
const RECEIPTS_TIMEOUT_SEC: f64 = 10f64;
const FORK_HEADER_TIMEOUT_SEC: f64 = 3f64;
const SNAPSHOT_MANIFEST_TIMEOUT_SEC: f64 = 3f64;
const SNAPSHOT_DATA_TIMEOUT_SEC: f64 = 60f64;
const MAX_SNAPSHOT_CHUNKS_DOWNLOAD_AHEAD: usize = 3;

const WAIT_PEERS_TIMEOUT_SEC: u64 = 5;
const STATUS_TIMEOUT_SEC: u64 = 5;
const HEADERS_TIMEOUT_SEC: u64 = 15;
const BODIES_TIMEOUT_SEC: u64 = 10;
const RECEIPTS_TIMEOUT_SEC: u64 = 10;
const FORK_HEADER_TIMEOUT_SEC: u64 = 3;
const SNAPSHOT_MANIFEST_TIMEOUT_SEC: u64 = 3;
const SNAPSHOT_DATA_TIMEOUT_SEC: u64 = 60;

#[derive(Copy, Clone, Eq, PartialEq, Debug)]
/// Sync state
pub enum SyncState {
/// Waiting for pv64 peers to start snapshot syncing
/// Collecting enough peers to start syncing.
WaitingPeers,
/// Waiting for snapshot manifest download
SnapshotManifest,
/// Downloading snapshot data
SnapshotData,
/// Waiting for snapshot restoration to complete
/// Waiting for snapshot restoration progress.
SnapshotWaiting,
/// Downloading new blocks
Blocks,
Expand Down Expand Up @@ -276,7 +283,7 @@ struct PeerInfo {
/// Holds requested snapshot chunk hash if any.
asking_snapshot_data: Option<H256>,
/// Request timestamp
ask_time: f64,
ask_time: u64,
/// Holds a set of transactions recently sent to this peer to avoid spamming.
last_sent_transactions: HashSet<H256>,
/// Pending request is expired and result should be ignored
Expand Down Expand Up @@ -324,10 +331,13 @@ pub struct ChainSync {
network_id: U256,
/// Optional fork block to check
fork_block: Option<(BlockNumber, H256)>,
/// Snapshot sync allowed.
snapshot_sync_enabled: bool,
/// Snapshot downloader.
snapshot: Snapshot,
/// Connected peers pending Status message.
/// Value is request timestamp.
handshaking_peers: HashMap<PeerId, u64>,
/// Sync start timestamp. Measured when first peer is connected
sync_start_time: Option<u64>,
}

type RlpResponseResult = Result<Option<(PacketId, RlpStream)>, PacketDecodeError>;
Expand All @@ -337,20 +347,21 @@ impl ChainSync {
pub fn new(config: SyncConfig, chain: &BlockChainClient) -> ChainSync {
let chain_info = chain.chain_info();
let mut sync = ChainSync {
state: SyncState::Idle,
state: if config.warp_sync { SyncState::WaitingPeers } else { SyncState::Idle },
starting_block: chain.chain_info().best_block_number,
highest_block: None,
peers: HashMap::new(),
handshaking_peers: HashMap::new(),
active_peers: HashSet::new(),
new_blocks: BlockDownloader::new(false, &chain_info.best_block_hash, chain_info.best_block_number),
old_blocks: None,
last_sent_block_number: 0,
network_id: config.network_id,
fork_block: config.fork_block,
snapshot_sync_enabled: config.warp_sync,
snapshot: Snapshot::new(),
sync_start_time: None,
};
sync.init_downloaders(chain);
sync.update_targets(chain);
sync
}

Expand Down Expand Up @@ -442,20 +453,67 @@ impl ChainSync {
self.active_peers.remove(&peer_id);
}

fn start_snapshot_sync(&mut self, io: &mut SyncIo, peer_id: PeerId) {
fn maybe_start_snapshot_sync(&mut self, io: &mut SyncIo) {
if self.state != SyncState::WaitingPeers {
return;
}
let best_block = io.chain().chain_info().best_block_number;

let (best_hash, max_peers, snapshot_peers) = {
//collect snapshot infos from peers
let snapshots = self.peers.iter()
.filter(|&(_, p)| p.is_allowed() && p.snapshot_number.map_or(false, |sn| best_block < sn && (sn - best_block) > SNAPSHOT_RESTORE_THRESHOLD))
.filter_map(|(p, peer)| peer.snapshot_hash.map(|hash| (p, hash.clone())));

let mut snapshot_peers = HashMap::new();
let mut max_peers: usize = 0;
let mut best_hash = None;
for (p, hash) in snapshots {
let peers = snapshot_peers.entry(hash).or_insert_with(Vec::new);
peers.push(*p);
if peers.len() > max_peers {
max_peers = peers.len();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

if there's a tie here it should be weighted by the best block number so you don't get very stale snapshots.

best_hash = Some(hash);
}
}
(best_hash, max_peers, snapshot_peers)
};

let timeout = self.sync_start_time.map_or(false, |t| ((time::precise_time_ns() - t) / 1_000_000_000) > WAIT_PEERS_TIMEOUT_SEC);

if let (Some(hash), Some(peers)) = (best_hash, best_hash.map_or(None, |h| snapshot_peers.get(&h))) {
if max_peers >= SNAPSHOT_MIN_PEERS {
trace!(target: "sync", "Starting confirmed snapshot sync {:?} with {:?}", hash, peers);
self.start_snapshot_sync(io, peers);
} else if timeout {
trace!(target: "sync", "Starting unconfirmed snapshot sync {:?} with {:?}", hash, peers);
self.start_snapshot_sync(io, peers);
}
} else if timeout {
trace!(target: "sync", "No snapshots found, starting full sync");
self.state = SyncState::Idle;
self.continue_sync(io);
}
}

fn start_snapshot_sync(&mut self, io: &mut SyncIo, peers: &[PeerId]) {
self.snapshot.clear();
self.request_snapshot_manifest(io, peer_id);
for p in peers {
if self.peers.get(p).map_or(false, |p| p.asking == PeerAsking::Nothing) {
self.request_snapshot_manifest(io, *p);
}
}
self.state = SyncState::SnapshotManifest;
}

/// Restart sync disregarding the block queue status. May end up re-downloading up to QUEUE_SIZE blocks
pub fn restart(&mut self, io: &mut SyncIo) {
self.init_downloaders(io.chain());
self.update_targets(io.chain());
self.reset_and_continue(io);
}

/// Restart sync after bad block has been detected. May end up re-downloading up to QUEUE_SIZE blocks
fn init_downloaders(&mut self, chain: &BlockChainClient) {
/// Update sync after the blockchain has been changed externally.
pub fn update_targets(&mut self, chain: &BlockChainClient) {
// Do not assume that the block queue/chain still has our last_imported_block
let chain = chain.chain_info();
self.new_blocks = BlockDownloader::new(false, &chain.best_block_hash, chain.best_block_number);
Expand All @@ -475,6 +533,7 @@ impl ChainSync {

/// Called by peer to report status
fn on_peer_status(&mut self, io: &mut SyncIo, peer_id: PeerId, r: &UntrustedRlp) -> Result<(), PacketDecodeError> {
self.handshaking_peers.remove(&peer_id);
let protocol_version: u8 = try!(r.val_at(0));
let warp_protocol = io.protocol_version(&WARP_SYNC_PROTOCOL_ID, peer_id) != 0;
let peer = PeerInfo {
Expand All @@ -486,7 +545,7 @@ impl ChainSync {
asking: PeerAsking::Nothing,
asking_blocks: Vec::new(),
asking_hash: None,
ask_time: 0f64,
ask_time: 0,
last_sent_transactions: HashSet::new(),
expired: false,
confirmation: if self.fork_block.is_none() { ForkConfirmation::Confirmed } else { ForkConfirmation::Unconfirmed },
Expand All @@ -496,7 +555,12 @@ impl ChainSync {
block_set: None,
};

trace!(target: "sync", "New peer {} (protocol: {}, network: {:?}, difficulty: {:?}, latest:{}, genesis:{})", peer_id, peer.protocol_version, peer.network_id, peer.difficulty, peer.latest_hash, peer.genesis);
if self.sync_start_time.is_none() {
self.sync_start_time = Some(time::precise_time_ns());
}

trace!(target: "sync", "New peer {} (protocol: {}, network: {:?}, difficulty: {:?}, latest:{}, genesis:{}, snapshot:{:?})",
peer_id, peer.protocol_version, peer.network_id, peer.difficulty, peer.latest_hash, peer.genesis, peer.snapshot_number);
if io.is_expired() {
trace!(target: "sync", "Status packet from expired session {}:{}", peer_id, io.peer_info(peer_id));
return Ok(());
Expand Down Expand Up @@ -578,7 +642,7 @@ impl ChainSync {
}
let item_count = r.item_count();
trace!(target: "sync", "{} -> BlockHeaders ({} entries), state = {:?}, set = {:?}", peer_id, item_count, self.state, block_set);
if self.state == SyncState::Idle && self.old_blocks.is_none() {
if (self.state == SyncState::Idle || self.state == SyncState::WaitingPeers) && self.old_blocks.is_none() {
trace!(target: "sync", "Ignored unexpected block headers");
self.continue_sync(io);
return Ok(());
Expand Down Expand Up @@ -875,7 +939,7 @@ impl ChainSync {
}
self.clear_peer_download(peer_id);
if !self.reset_peer_asking(peer_id, PeerAsking::SnapshotManifest) || self.state != SyncState::SnapshotManifest {
trace!(target: "sync", "{}: Ignored unexpected manifest", peer_id);
trace!(target: "sync", "{}: Ignored unexpected/expired manifest", peer_id);
self.continue_sync(io);
return Ok(());
}
Expand Down Expand Up @@ -918,7 +982,7 @@ impl ChainSync {
match io.snapshot_service().status() {
RestorationStatus::Inactive | RestorationStatus::Failed => {
trace!(target: "sync", "{}: Snapshot restoration aborted", peer_id);
self.state = SyncState::Idle;
self.state = SyncState::WaitingPeers;
self.snapshot.clear();
self.continue_sync(io);
return Ok(());
Expand Down Expand Up @@ -960,6 +1024,7 @@ impl ChainSync {
/// Called by peer when it is disconnecting
pub fn on_peer_aborting(&mut self, io: &mut SyncIo, peer: PeerId) {
trace!(target: "sync", "== Disconnecting {}: {}", peer, io.peer_info(peer));
self.handshaking_peers.remove(&peer);
if self.peers.contains_key(&peer) {
debug!(target: "sync", "Disconnected {}", peer);
self.clear_peer_download(peer);
Expand All @@ -975,12 +1040,14 @@ impl ChainSync {
if let Err(e) = self.send_status(io, peer) {
debug!(target:"sync", "Error sending status request: {:?}", e);
io.disable_peer(peer);
} else {
self.handshaking_peers.insert(peer, time::precise_time_ns());
}
}

/// Resume downloading
fn continue_sync(&mut self, io: &mut SyncIo) {
if self.state != SyncState::Waiting && self.state != SyncState::SnapshotWaiting
if (self.state == SyncState::Blocks || self.state == SyncState::NewBlocks || self.state == SyncState::Idle)
&& !self.peers.values().any(|p| p.asking != PeerAsking::Nothing && p.block_set != Some(BlockSet::OldBlocks) && p.can_sync()) {
self.complete_sync(io);
}
Expand Down Expand Up @@ -1040,11 +1107,9 @@ impl ChainSync {
let higher_difficulty = peer_difficulty.map_or(true, |pd| pd > syncing_difficulty);
if force || self.state == SyncState::NewBlocks || higher_difficulty || self.old_blocks.is_some() {
match self.state {
SyncState::Idle if self.snapshot_sync_enabled
&& chain_info.best_block_number < peer_snapshot_number
&& (peer_snapshot_number - chain_info.best_block_number) > SNAPSHOT_RESTORE_THRESHOLD => {
trace!(target: "sync", "Starting snapshot sync: {} vs {}", peer_snapshot_number, chain_info.best_block_number);
self.start_snapshot_sync(io, peer_id);
SyncState::WaitingPeers => {
trace!(target: "sync", "Checking snapshot sync: {} vs {}", peer_snapshot_number, chain_info.best_block_number);
self.maybe_start_snapshot_sync(io);
},
SyncState::Idle | SyncState::Blocks | SyncState::NewBlocks => {
if io.chain().queue_info().is_full() {
Expand All @@ -1070,6 +1135,13 @@ impl ChainSync {
}
},
SyncState::SnapshotData => {
if let RestorationStatus::Ongoing { state_chunks: _, block_chunks: _, state_chunks_done, block_chunks_done, } = io.snapshot_service().status() {
if self.snapshot.done_chunks() - (state_chunks_done + block_chunks_done) as usize > MAX_SNAPSHOT_CHUNKS_DOWNLOAD_AHEAD {
trace!(target: "sync", "Snapshot queue full, pausing sync");
self.state = SyncState::SnapshotWaiting;
return;
}
}
if peer_snapshot_hash.is_some() && peer_snapshot_hash == self.snapshot.snapshot_hash() {
self.request_snapshot_data(io, peer_id);
}
Expand Down Expand Up @@ -1253,7 +1325,7 @@ impl ChainSync {
warn!(target:"sync", "Asking {:?} while requesting {:?}", peer.asking, asking);
}
peer.asking = asking;
peer.ask_time = time::precise_time_s();
peer.ask_time = time::precise_time_ns();
let result = if packet_id >= ETH_PACKET_COUNT {
sync.send_protocol(WARP_SYNC_PROTOCOL_ID, peer_id, packet_id, packet)
} else {
Expand Down Expand Up @@ -1590,17 +1662,18 @@ impl ChainSync {

#[cfg_attr(feature="dev", allow(match_same_arms))]
pub fn maintain_peers(&mut self, io: &mut SyncIo) {
let tick = time::precise_time_s();
let tick = time::precise_time_ns();
let mut aborting = Vec::new();
for (peer_id, peer) in &self.peers {
let elapsed = (tick - peer.ask_time) / 1_000_000_000;
let timeout = match peer.asking {
PeerAsking::BlockHeaders => (tick - peer.ask_time) > HEADERS_TIMEOUT_SEC,
PeerAsking::BlockBodies => (tick - peer.ask_time) > BODIES_TIMEOUT_SEC,
PeerAsking::BlockReceipts => (tick - peer.ask_time) > RECEIPTS_TIMEOUT_SEC,
PeerAsking::BlockHeaders => elapsed > HEADERS_TIMEOUT_SEC,
PeerAsking::BlockBodies => elapsed > BODIES_TIMEOUT_SEC,
PeerAsking::BlockReceipts => elapsed > RECEIPTS_TIMEOUT_SEC,
PeerAsking::Nothing => false,
PeerAsking::ForkHeader => (tick - peer.ask_time) > FORK_HEADER_TIMEOUT_SEC,
PeerAsking::SnapshotManifest => (tick - peer.ask_time) > SNAPSHOT_MANIFEST_TIMEOUT_SEC,
PeerAsking::SnapshotData => (tick - peer.ask_time) > SNAPSHOT_DATA_TIMEOUT_SEC,
PeerAsking::ForkHeader => elapsed > FORK_HEADER_TIMEOUT_SEC,
PeerAsking::SnapshotManifest => elapsed > SNAPSHOT_MANIFEST_TIMEOUT_SEC,
PeerAsking::SnapshotData => elapsed > SNAPSHOT_DATA_TIMEOUT_SEC,
};
if timeout {
trace!(target:"sync", "Timeout {}", peer_id);
Expand All @@ -1611,16 +1684,42 @@ impl ChainSync {
for p in aborting {
self.on_peer_aborting(io, p);
}

// Check for handshake timeouts
for (peer, ask_time) in &self.handshaking_peers {
let elapsed = (tick - ask_time) / 1_000_000_000;
if elapsed > STATUS_TIMEOUT_SEC {
trace!(target:"sync", "Status timeout {}", peer);
io.disconnect_peer(*peer);
}
}
}

fn check_resume(&mut self, io: &mut SyncIo) {
if self.state == SyncState::Waiting && !io.chain().queue_info().is_full() && self.state == SyncState::Waiting {
self.state = SyncState::Blocks;
self.continue_sync(io);
} else if self.state == SyncState::SnapshotWaiting && io.snapshot_service().status() == RestorationStatus::Inactive {
trace!(target:"sync", "Snapshot restoration is complete");
self.restart(io);
self.continue_sync(io);
} else if self.state == SyncState::SnapshotWaiting {
match io.snapshot_service().status() {
RestorationStatus::Inactive => {
trace!(target:"sync", "Snapshot restoration is complete");
self.restart(io);
self.continue_sync(io);
},
RestorationStatus::Ongoing { state_chunks: _, block_chunks: _, state_chunks_done, block_chunks_done, } => {
if !self.snapshot.is_complete() && self.snapshot.done_chunks() - (state_chunks_done + block_chunks_done) as usize <= MAX_SNAPSHOT_CHUNKS_DOWNLOAD_AHEAD {
trace!(target:"sync", "Resuming snapshot sync");
self.state = SyncState::SnapshotData;
self.continue_sync(io);
}
},
RestorationStatus::Failed => {
trace!(target: "sync", "Snapshot restoration aborted");
self.state = SyncState::WaitingPeers;
self.snapshot.clear();
self.continue_sync(io);
},
}
}
}

Expand Down Expand Up @@ -1828,6 +1927,7 @@ impl ChainSync {

/// Maintain other peers. Send out any new blocks and transactions
pub fn maintain_sync(&mut self, io: &mut SyncIo) {
self.maybe_start_snapshot_sync(io);
self.check_resume(io);
}

Expand Down Expand Up @@ -2050,7 +2150,7 @@ mod tests {
asking: PeerAsking::Nothing,
asking_blocks: Vec::new(),
asking_hash: None,
ask_time: 0f64,
ask_time: 0,
last_sent_transactions: HashSet::new(),
expired: false,
confirmation: super::ForkConfirmation::Confirmed,
Expand Down
Loading