Skip to content
This repository has been archived by the owner on Nov 6, 2020. It is now read-only.

Snapshot restoration overhaul #11219

Merged
merged 29 commits into from
Oct 31, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
29 commits
Select commit Hold shift + click to select a range
a2a3257
Comments and todos
dvdplm Oct 23, 2019
c543a46
Merge branch 'master' into dp/chore/ensure-we-ignore-old-snapshots
dvdplm Oct 24, 2019
86f0450
fix compilation
dvdplm Oct 24, 2019
e250df2
More todos, more logs
dvdplm Oct 24, 2019
5655285
Fix picking snapshot peer: prefer the one with the highest block number
dvdplm Oct 25, 2019
9e79bc0
Adjust WAIT_PEERS_TIMEOUT to be a multiple of MAINTAIN_SYNC_TIMER to …
dvdplm Oct 25, 2019
41f583c
Merge branch 'master' into dp/chore/ensure-we-ignore-old-snapshots
dvdplm Oct 28, 2019
ec6e4a5
Tabs
dvdplm Oct 28, 2019
74ce5e8
Formatting
dvdplm Oct 28, 2019
241675f
Don't build new rlp::EMPTY_LIST_RLP instances
dvdplm Oct 28, 2019
b8a1504
Dial down debug logging
dvdplm Oct 28, 2019
956e237
Don't warn about missing hashes in the manifest: it's normal
dvdplm Oct 28, 2019
f7c7a8a
Cleanup
dvdplm Oct 28, 2019
3ab9017
Do not skip snapshots further away than 30k block from the highest bl…
dvdplm Oct 28, 2019
34cef68
lockfile
dvdplm Oct 28, 2019
fff43cc
Add a `ChunkType::Dupe` variant so that we do not disconnect a peer i…
dvdplm Oct 28, 2019
f4ccf20
tweak log message
dvdplm Oct 28, 2019
decb18c
Don't warp sync twice
dvdplm Oct 29, 2019
d79a4f4
Avoid iterating over all snapshot block/state hashes to find the next…
dvdplm Oct 29, 2019
ebab778
Address review grumbles
dvdplm Oct 30, 2019
2e75fb4
Log correct number of bytes written to disk
dvdplm Oct 30, 2019
5970b80
Revert ChunkType::Dup change
dvdplm Oct 30, 2019
100ca6b
whitespace grumble
dvdplm Oct 30, 2019
bd9fc20
Cleanup debugging code
dvdplm Oct 30, 2019
29a26c9
Fix docs
dvdplm Oct 30, 2019
ac4cd10
Fix import and a typo
dvdplm Oct 30, 2019
81d46cb
Fix test impl
dvdplm Oct 30, 2019
d2c347a
Use `indexmap::IndexSet` to ensure chunk hashes are accessed in order
dvdplm Oct 31, 2019
1ebd119
Revert increased SNAPSHOT_MANIFEST_TIMEOUT: 5sec should be enough
dvdplm Oct 31, 2019
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
100 changes: 52 additions & 48 deletions Cargo.lock

Large diffs are not rendered by default.

1 change: 0 additions & 1 deletion ethcore/snapshot/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -452,7 +452,6 @@ impl StateRebuilder {
StateDB::commit_bloom(&mut batch, bloom_journal)?;
self.db.inject(&mut batch)?;
backing.write_buffered(batch);
trace!(target: "snapshot", "current state root: {:?}", self.state_root);
Ok(())
}

Expand Down
4 changes: 2 additions & 2 deletions ethcore/snapshot/src/service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -161,6 +161,7 @@ impl Restoration {

if let Some(ref mut writer) = self.writer.as_mut() {
writer.write_state_chunk(hash, chunk)?;
trace!(target: "snapshot", "Wrote {}/{} bytes of state to db/disk. Current state root: {:?}", len, chunk.len(), self.state.state_root());
}

self.state_chunks_left.remove(&hash);
Expand Down Expand Up @@ -676,7 +677,6 @@ impl<C> Service<C> where C: SnapshotClient + ChainInfo {
} else if manifest.state_hashes.contains(&hash) {
true
} else {
warn!(target: "snapshot", "Hash of the content of {:?} not present in the manifest block/state hashes.", path);
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why removed?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Because it's not an abnormal condition, it's expected when we start syncing.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

To elaborate on that: when we see that there was a partial restoration going on before us, we try to re-use the chunks we already have on disk. This warning looks scary to users but it's not really something we need to worry about I think: it just means we can't use the chunk because we're now working on a different snapshot.
I can put it back as a trace! perhaps?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is that a yes? put the log back as a trace!?

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

either way is fine :)

return Ok(false);
};

Expand Down Expand Up @@ -788,7 +788,7 @@ impl<C> Service<C> where C: SnapshotClient + ChainInfo {
false => Ok(())
}
}
other => other.map(drop),
Err(e) => Err(e)
};
(res, db)
}
Expand Down
3 changes: 1 addition & 2 deletions ethcore/snapshot/src/traits.rs
Original file line number Diff line number Diff line change
Expand Up @@ -51,8 +51,7 @@ pub trait SnapshotService : Sync + Send {
fn status(&self) -> RestorationStatus;

/// Begin snapshot restoration.
/// If restoration in-progress, this will reset it.
/// From this point on, any previous snapshot may become unavailable.
/// If a restoration is in progress, this will reset it and clear all data.
fn begin_restore(&self, manifest: ManifestData);

/// Abort an in-progress restoration if there is one.
Expand Down
1 change: 1 addition & 0 deletions ethcore/sync/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ ethcore-private-tx = { path = "../private-tx" }
ethereum-types = "0.8.0"
fastmap = { path = "../../util/fastmap" }
futures = "0.1"
indexmap = "1.3.0"
keccak-hash = "0.4.0"
light = { package = "ethcore-light", path = "../light" }
log = "0.4"
Expand Down
2 changes: 1 addition & 1 deletion ethcore/sync/src/api.rs
Original file line number Diff line number Diff line change
Expand Up @@ -295,7 +295,7 @@ pub struct EthSync {
light_subprotocol_name: [u8; 3],
/// Priority tasks notification channel
priority_tasks: Mutex<mpsc::Sender<PriorityTask>>,
/// for state tracking
/// Track the sync state: are we importing or verifying blocks?
is_major_syncing: Arc<AtomicBool>
}

Expand Down
2 changes: 1 addition & 1 deletion ethcore/sync/src/block_sync.rs
Original file line number Diff line number Diff line change
Expand Up @@ -309,7 +309,7 @@ impl BlockDownloader {
}
}
}

// Update the highest block number seen on the network from the header.
if let Some((number, _)) = last_header {
if self.highest_block.as_ref().map_or(true, |n| number > *n) {
self.highest_block = Some(number);
Expand Down
62 changes: 33 additions & 29 deletions ethcore/sync/src/chain/handler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ use ethereum_types::{H256, U256};
use keccak_hash::keccak;
use network::PeerId;
use network::client_version::ClientVersion;
use log::{debug, trace, error};
use log::{debug, trace, error, warn};
use rlp::Rlp;
use common_types::{
BlockNumber,
Expand Down Expand Up @@ -76,14 +76,14 @@ impl SyncHandler {
SignedPrivateTransactionPacket => SyncHandler::on_signed_private_transaction(sync, io, peer, &rlp),
PrivateStatePacket => SyncHandler::on_private_state_data(sync, io, peer, &rlp),
_ => {
debug!(target: "sync", "{}: Unknown packet {}", peer, packet_id.id());
trace!(target: "sync", "{}: Unknown packet {}", peer, packet_id.id());
Ok(())
}
};

match result {
Err(DownloaderImportError::Invalid) => {
debug!(target:"sync", "{} -> Invalid packet {}", peer, packet_id.id());
trace!(target:"sync", "{} -> Invalid packet {}", peer, packet_id.id());
io.disable_peer(peer);
sync.deactivate_peer(io, peer);
},
Expand All @@ -96,7 +96,7 @@ impl SyncHandler {
},
}
} else {
debug!(target: "sync", "{}: Unknown packet {}", peer, packet_id);
trace!(target: "sync", "{}: Unknown packet {}", peer, packet_id);
}
}

Expand All @@ -117,14 +117,14 @@ impl SyncHandler {
sync.active_peers.remove(&peer_id);

if sync.state == SyncState::SnapshotManifest {
// Check if we are asking other peers for
// the snapshot manifest as well.
// If not, return to initial state
let still_asking_manifest = sync.peers.iter()
// Check if we are asking other peers for a snapshot manifest as well. If not,
// set our state to initial state (`Idle` or `WaitingPeers`).
let still_seeking_manifest = sync.peers.iter()
.filter(|&(id, p)| sync.active_peers.contains(id) && p.asking == PeerAsking::SnapshotManifest)
.next().is_none();
.next().is_some();
dvdplm marked this conversation as resolved.
Show resolved Hide resolved

if still_asking_manifest {
if !still_seeking_manifest {
warn!(target: "snapshot_sync", "The peer we were downloading a snapshot from ({}) went away. Retrying.", peer_id);
sync.state = ChainSync::get_init_state(sync.warp_sync, io.chain());
}
}
Expand Down Expand Up @@ -371,18 +371,18 @@ impl SyncHandler {
let block_set = sync.peers.get(&peer_id).and_then(|p| p.block_set).unwrap_or(BlockSet::NewBlocks);

if !sync.reset_peer_asking(peer_id, PeerAsking::BlockHeaders) {
debug!(target: "sync", "{}: Ignored unexpected headers", peer_id);
trace!(target: "sync", "{}: Ignored unexpected headers", peer_id);
return Ok(());
}
let expected_hash = match expected_hash {
Some(hash) => hash,
None => {
debug!(target: "sync", "{}: Ignored unexpected headers (expected_hash is None)", peer_id);
trace!(target: "sync", "{}: Ignored unexpected headers (expected_hash is None)", peer_id);
return Ok(());
}
};
if !allowed {
debug!(target: "sync", "{}: Ignored unexpected headers (peer not allowed)", peer_id);
trace!(target: "sync", "{}: Ignored unexpected headers (peer not allowed)", peer_id);
return Ok(());
}

Expand Down Expand Up @@ -466,12 +466,12 @@ impl SyncHandler {
/// Called when snapshot manifest is downloaded from a peer.
fn on_snapshot_manifest(sync: &mut ChainSync, io: &mut dyn SyncIo, peer_id: PeerId, r: &Rlp) -> Result<(), DownloaderImportError> {
if !sync.peers.get(&peer_id).map_or(false, |p| p.can_sync()) {
trace!(target: "sync", "Ignoring snapshot manifest from unconfirmed peer {}", peer_id);
trace!(target: "snapshot_sync", "Ignoring snapshot manifest from unconfirmed peer {}", peer_id);
return Ok(());
}
sync.clear_peer_download(peer_id);
if !sync.reset_peer_asking(peer_id, PeerAsking::SnapshotManifest) || sync.state != SyncState::SnapshotManifest {
trace!(target: "sync", "{}: Ignored unexpected/expired manifest", peer_id);
trace!(target: "snapshot_sync", "{}: Ignored unexpected/expired manifest", peer_id);
return Ok(());
}

Expand All @@ -482,10 +482,12 @@ impl SyncHandler {
.map_or(false, |(l, h)| manifest.version >= l && manifest.version <= h);

if !is_supported_version {
trace!(target: "sync", "{}: Snapshot manifest version not supported: {}", peer_id, manifest.version);
warn!(target: "snapshot_sync", "{}: Snapshot manifest version not supported: {}", peer_id, manifest.version);
return Err(DownloaderImportError::Invalid);
}
sync.snapshot.reset_to(&manifest, &keccak(manifest_rlp.as_raw()));
debug!(target: "snapshot_sync", "{}: Peer sent a snapshot manifest we can use. Block number #{}, block chunks: {}, state chunks: {}",
peer_id, manifest.block_number, manifest.block_hashes.len(), manifest.state_hashes.len());
io.snapshot_service().begin_restore(manifest);
sync.state = SyncState::SnapshotData;

Expand All @@ -495,56 +497,56 @@ impl SyncHandler {
/// Called when snapshot data is downloaded from a peer.
fn on_snapshot_data(sync: &mut ChainSync, io: &mut dyn SyncIo, peer_id: PeerId, r: &Rlp) -> Result<(), DownloaderImportError> {
if !sync.peers.get(&peer_id).map_or(false, |p| p.can_sync()) {
trace!(target: "sync", "Ignoring snapshot data from unconfirmed peer {}", peer_id);
trace!(target: "snapshot_sync", "Ignoring snapshot data from unconfirmed peer {}", peer_id);
return Ok(());
}
sync.clear_peer_download(peer_id);
if !sync.reset_peer_asking(peer_id, PeerAsking::SnapshotData) || (sync.state != SyncState::SnapshotData && sync.state != SyncState::SnapshotWaiting) {
trace!(target: "sync", "{}: Ignored unexpected snapshot data", peer_id);
trace!(target: "snapshot_sync", "{}: Ignored unexpected snapshot data", peer_id);
return Ok(());
}

// check service status
let status = io.snapshot_service().status();
match status {
RestorationStatus::Inactive | RestorationStatus::Failed => {
trace!(target: "sync", "{}: Snapshot restoration aborted", peer_id);
trace!(target: "snapshot_sync", "{}: Snapshot restoration status: {:?}", peer_id, status);
sync.state = SyncState::WaitingPeers;

// only note bad if restoration failed.
if let (Some(hash), RestorationStatus::Failed) = (sync.snapshot.snapshot_hash(), status) {
trace!(target: "sync", "Noting snapshot hash {} as bad", hash);
debug!(target: "snapshot_sync", "Marking snapshot manifest hash {} as bad", hash);
dvdplm marked this conversation as resolved.
Show resolved Hide resolved
sync.snapshot.note_bad(hash);
}

sync.snapshot.clear();
return Ok(());
},
RestorationStatus::Initializing { .. } => {
trace!(target: "warp", "{}: Snapshot restoration is initializing", peer_id);
trace!(target: "snapshot_sync", "{}: Snapshot restoration is initializing. Can't accept data right now.", peer_id);
return Ok(());
}
RestorationStatus::Finalizing => {
trace!(target: "warp", "{}: Snapshot finalizing restoration", peer_id);
trace!(target: "snapshot_sync", "{}: Snapshot finalizing restoration. Can't accept data right now.", peer_id);
return Ok(());
}
RestorationStatus::Ongoing { .. } => {
trace!(target: "sync", "{}: Snapshot restoration is ongoing", peer_id);
trace!(target: "snapshot_sync", "{}: Snapshot restoration is ongoing", peer_id);
},
}

let snapshot_data: Bytes = r.val_at(0)?;
match sync.snapshot.validate_chunk(&snapshot_data) {
Ok(ChunkType::Block(hash)) => {
trace!(target: "sync", "{}: Processing block chunk", peer_id);
trace!(target: "snapshot_sync", "{}: Processing block chunk", peer_id);
io.snapshot_service().restore_block_chunk(hash, snapshot_data);
}
Ok(ChunkType::State(hash)) => {
trace!(target: "sync", "{}: Processing state chunk", peer_id);
trace!(target: "snapshot_sync", "{}: Processing state chunk", peer_id);
io.snapshot_service().restore_state_chunk(hash, snapshot_data);
}
Err(()) => {
trace!(target: "sync", "{}: Got bad snapshot chunk", peer_id);
trace!(target: "snapshot_sync", "{}: Got bad snapshot chunk", peer_id);
io.disconnect_peer(peer_id);
return Ok(());
}
Expand All @@ -566,7 +568,7 @@ impl SyncHandler {
let warp_protocol = warp_protocol_version != 0;
let private_tx_protocol = warp_protocol_version >= PAR_PROTOCOL_VERSION_3.0;
let peer = PeerInfo {
protocol_version: protocol_version,
protocol_version,
network_id: r.val_at(1)?,
difficulty: Some(r.val_at(2)?),
latest_hash: r.val_at(3)?,
Expand Down Expand Up @@ -595,15 +597,17 @@ impl SyncHandler {
latest:{}, \
genesis:{}, \
snapshot:{:?}, \
private_tx_enabled:{})",
private_tx_enabled:{}, \
client_version: {})",
peer_id,
peer.protocol_version,
peer.network_id,
peer.difficulty,
peer.latest_hash,
peer.genesis,
peer.snapshot_number,
peer.private_tx_enabled
peer.private_tx_enabled,
peer.client_version,
);
if io.is_expired() {
trace!(target: "sync", "Status packet from expired session {}:{}", peer_id, io.peer_version(peer_id));
Expand Down
Loading