Skip to content
This repository has been archived by the owner on Nov 6, 2020. It is now read-only.

Commit

Permalink
Snapshot restoration overhaul (#11219)
Browse files Browse the repository at this point in the history
* Comments and todos
Use `snapshot_sync` as logging target

* fix compilation

* More todos, more logs

* Fix picking snapshot peer: prefer the one with the highest block number
More docs, comments, todos

* Adjust WAIT_PEERS_TIMEOUT to be a multiple of MAINTAIN_SYNC_TIMER to try to fix snapshot startup problems
Docs, todos, comments

* Tabs

* Formatting

* Don't build new rlp::EMPTY_LIST_RLP instances

* Dial down debug logging

* Don't warn about missing hashes in the manifest: it's normal
Log client version on peer connect

* Cleanup

* Do not skip snapshots further away than 30k block from the highest block seen

Currently we look for peers that seed snapshots that are close to the highest block seen on the network (where "close" means withing 30k blocks). When a node starts up we wait for some time (5sec, increased here to 10sec) to let peers connect and if we have found a suitable peer to sync a snapshot from at the end of that delay, we start the download; if none is found and --warp-barrier is used we stall, otherwise we start a slow-sync.
When looking for a suitable snapshot, we use the highest block seen on the network to check if a peer has a snapshot that is within 30k blocks of that highest block number. This means that in a situation where all available snapshots are older than that, we will often fail to start a snapshot at all. What's worse is that the longer we delay starting a snapshot sync (to let more peers connect, in the hope of finding a good snapshot), the more likely we are to have seen a high block and thus the more likely we become to accept a snapshot.
This commit removes this comparison with the highest blocknumber criteria entirely and picks the best snapshot we find in 10sec.

* lockfile

* Add a `ChunkType::Dupe` variant so that we do not disconnect a peer if they happen to send us a duplicate chunk (just ignore the chunk and keep going)
Resolve some documentation todos, add more

* tweak log message

* Don't warp sync twice
Check if our own block is beyond the given warp barrier (can happen after we've completed a warp sync but are not quite yet synced up to the tip) and if so, don't sync.
More docs, resolve todos.
Dial down some `sync` debug level logging to trace

* Avoid iterating over all snapshot block/state hashes to find the next work item

Use a HashSet instead of a Vec and remove items from the set as chunks are processed. Calculate and store the total number of chunks in the `Snapshot`  struct instead of counting pending chunks each time.

* Address review grumbles

* Log correct number of bytes written to disk

* Revert ChunkType::Dup change

* whitespace grumble

* Cleanup debugging code

* Fix docs

* Fix import and a typo

* Fix test impl

* Use `indexmap::IndexSet` to ensure chunk hashes are accessed in order

* Revert increased SNAPSHOT_MANIFEST_TIMEOUT: 5sec should be enough
  • Loading branch information
dvdplm authored and niklasad1 committed Nov 7, 2019
1 parent cd5e43d commit e512bf4
Show file tree
Hide file tree
Showing 17 changed files with 793 additions and 296 deletions.
82 changes: 43 additions & 39 deletions Cargo.lock

Large diffs are not rendered by default.

162 changes: 162 additions & 0 deletions ethcore/snapshot/src/traits.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,162 @@
// Copyright 2015-2019 Parity Technologies (UK) Ltd.
// This file is part of Parity Ethereum.

// Parity Ethereum is free software: you can redistribute it and/or modify
// it under the terms of the GNU General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.

// Parity Ethereum is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU General Public License for more details.

// You should have received a copy of the GNU General Public License
// along with Parity Ethereum. If not, see <http://www.gnu.org/licenses/>.

use std::sync::{Arc, atomic::AtomicBool};

use blockchain::{BlockChain, BlockChainDB};
use bytes::Bytes;
use client_traits::{BlockChainClient, BlockInfo, DatabaseRestore, BlockChainReset};
use common_types::{
ids::BlockId,
errors::{EthcoreError as Error, SnapshotError},
snapshot::{ManifestData, ChunkSink, Progress, RestorationStatus},
};
use engine::Engine;
use ethereum_types::H256;
use parking_lot::RwLock;

use crate::io::SnapshotWriter;

/// The interface for a snapshot network service.
/// This handles:
/// - restoration of snapshots to temporary databases.
/// - responding to queries for snapshot manifests and chunks
pub trait SnapshotService : Sync + Send {
/// Query the most recent manifest data.
fn manifest(&self) -> Option<ManifestData>;

/// Get the supported range of snapshot version numbers.
/// `None` indicates warp sync isn't supported by the consensus engine.
fn supported_versions(&self) -> Option<(u64, u64)>;

/// Returns a list of the completed chunks
fn completed_chunks(&self) -> Option<Vec<H256>>;

/// Get raw chunk for a given hash.
fn chunk(&self, hash: H256) -> Option<Bytes>;

/// Ask the snapshot service for the restoration status.
fn status(&self) -> RestorationStatus;

/// Begin snapshot restoration.
/// If a restoration is in progress, this will reset it and clear all data.
fn begin_restore(&self, manifest: ManifestData);

/// Abort an in-progress restoration if there is one.
fn abort_restore(&self);

/// Feed a raw state chunk to the service to be processed asynchronously.
/// no-op if not currently restoring.
fn restore_state_chunk(&self, hash: H256, chunk: Bytes);

/// Feed a raw block chunk to the service to be processed asynchronously.
/// no-op if currently restoring.
fn restore_block_chunk(&self, hash: H256, chunk: Bytes);

/// Abort in-progress snapshotting if there is one.
fn abort_snapshot(&self);

/// Shutdown the Snapshot Service by aborting any ongoing restore
fn shutdown(&self);
}

/// Restore from secondary snapshot chunks.
pub trait Rebuilder: Send {
/// Feed a chunk, potentially out of order.
///
/// Check `abort_flag` periodically while doing heavy work. If set to `false`, should bail with
/// `Error::RestorationAborted`.
fn feed(
&mut self,
chunk: &[u8],
engine: &dyn Engine,
abort_flag: &AtomicBool,
) -> Result<(), Error>;

/// Finalize the restoration. Will be done after all chunks have been
/// fed successfully.
///
/// This should apply the necessary "glue" between chunks,
/// and verify against the restored state.
fn finalize(&mut self) -> Result<(), Error>;
}

/// Components necessary for snapshot creation and restoration.
pub trait SnapshotComponents: Send {
/// Create secondary snapshot chunks; these corroborate the state data
/// in the state chunks.
///
/// Chunks shouldn't exceed the given preferred size, and should be fed
/// uncompressed into the sink.
///
/// This will vary by consensus engine, so it's exposed as a trait.
fn chunk_all(
&mut self,
chain: &BlockChain,
block_at: H256,
chunk_sink: &mut ChunkSink,
progress: &RwLock<Progress>,
preferred_size: usize,
) -> Result<(), SnapshotError>;

/// Create a rebuilder, which will have chunks fed into it in arbitrary
/// order and then be finalized.
///
/// The manifest, a database, and fresh `BlockChain` are supplied.
///
/// The engine passed to the `Rebuilder` methods will be the same instance
/// that created the `SnapshotComponents`.
fn rebuilder(
&self,
chain: BlockChain,
db: Arc<dyn BlockChainDB>,
manifest: &ManifestData,
) -> Result<Box<dyn Rebuilder>, Error>;

/// Minimum supported snapshot version number.
fn min_supported_version(&self) -> u64;

/// Current version number
fn current_version(&self) -> u64;
}

/// Snapshot related functionality
pub trait SnapshotClient: BlockChainClient + BlockInfo + DatabaseRestore + BlockChainReset {
/// Take a snapshot at the given block.
/// If the BlockId is 'Latest', this will default to 1000 blocks behind.
fn take_snapshot<W: SnapshotWriter + Send>(
&self,
writer: W,
at: BlockId,
p: &RwLock<Progress>,
) -> Result<(), Error>;
}

/// Helper trait for broadcasting a block to take a snapshot at.
pub trait Broadcast: Send + Sync {
/// Start a snapshot from the given block number.
fn request_snapshot_at(&self, num: u64);
}


/// Helper trait for transforming hashes to block numbers and checking if syncing.
pub trait Oracle: Send + Sync {
/// Maps a block hash to a block number
fn to_number(&self, hash: H256) -> Option<u64>;

/// Are we currently syncing?
fn is_major_importing(&self) -> bool;
}
1 change: 0 additions & 1 deletion ethcore/src/snapshot/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -479,7 +479,6 @@ impl StateRebuilder {
StateDB::commit_bloom(&mut batch, bloom_journal)?;
self.db.inject(&mut batch)?;
backing.write_buffered(batch);
trace!(target: "snapshot", "current state root: {:?}", self.state_root);
Ok(())
}

Expand Down
3 changes: 2 additions & 1 deletion ethcore/src/snapshot/service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -141,6 +141,7 @@ impl Restoration {

if let Some(ref mut writer) = self.writer.as_mut() {
writer.write_state_chunk(hash, chunk)?;
trace!(target: "snapshot", "Wrote {}/{} bytes of state to db/disk. Current state root: {:?}", len, chunk.len(), self.state.state_root());
}

self.state_chunks_left.remove(&hash);
Expand Down Expand Up @@ -767,7 +768,7 @@ impl Service {
false => Ok(())
}
}
other => other.map(drop),
Err(e) => Err(e)
};
(res, db)
}
Expand Down
1 change: 1 addition & 0 deletions ethcore/sync/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ hash-db = "0.12.4"
keccak-hash = "0.2.0"
keccak-hasher = { path = "../../util/keccak-hasher" }
kvdb = "0.1"
indexmap = "1.3.0"
log = "0.4"
macros = { path = "../../util/macros" }
parity-bytes = "0.1"
Expand Down
2 changes: 1 addition & 1 deletion ethcore/sync/src/api.rs
Original file line number Diff line number Diff line change
Expand Up @@ -311,7 +311,7 @@ pub struct EthSync {
light_subprotocol_name: [u8; 3],
/// Priority tasks notification channel
priority_tasks: Mutex<mpsc::Sender<PriorityTask>>,
/// for state tracking
/// Track the sync state: are we importing or verifying blocks?
is_major_syncing: Arc<AtomicBool>
}

Expand Down
2 changes: 1 addition & 1 deletion ethcore/sync/src/block_sync.rs
Original file line number Diff line number Diff line change
Expand Up @@ -302,7 +302,7 @@ impl BlockDownloader {
}
}
}

// Update the highest block number seen on the network from the header.
if let Some((number, _)) = last_header {
if self.highest_block.as_ref().map_or(true, |n| number > *n) {
self.highest_block = Some(number);
Expand Down
Loading

0 comments on commit e512bf4

Please sign in to comment.