Skip to content
This repository has been archived by the owner on Nov 6, 2020. It is now read-only.

Add a way to signal shutdown to snapshotting threads #10744

Merged
merged 12 commits into from
Jun 19, 2019
11 changes: 9 additions & 2 deletions ethcore/service/src/service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,8 +30,10 @@ use blockchain::{BlockChainDB, BlockChainDBHandler};
use ethcore::client::{Client, ClientConfig, ChainNotify, ClientIoMessage};
use ethcore::miner::Miner;
use ethcore::snapshot::service::{Service as SnapshotService, ServiceParams as SnapServiceParams};
use ethcore::snapshot::{SnapshotService as _SnapshotService, RestorationStatus};
use ethcore::snapshot::{SnapshotService as _SnapshotService, RestorationStatus, Error as SnapshotError};
use ethcore::spec::Spec;
use ethcore::error::Error as EthcoreError;


use ethcore_private_tx::{self, Importer, Signer};
use Error;
Expand Down Expand Up @@ -197,6 +199,7 @@ impl ClientService {

/// Shutdown the Client Service
pub fn shutdown(&self) {
trace!(target: "shutdown", "Shutting down Client Service");
self.snapshot.shutdown();
}
}
Expand Down Expand Up @@ -257,7 +260,11 @@ impl IoHandler<ClientIoMessage> for ClientIoHandler {

let res = thread::Builder::new().name("Periodic Snapshot".into()).spawn(move || {
if let Err(e) = snapshot.take_snapshot(&*client, num) {
warn!("Failed to take snapshot at block #{}: {}", num, e);
match e {
EthcoreError::Snapshot(SnapshotError::SnapshotAborted) => info!("Snapshot aborted"),
_ => warn!("Failed to take snapshot at block #{}: {}", num, e),
}

}
});

Expand Down
27 changes: 20 additions & 7 deletions ethcore/src/client/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -764,8 +764,8 @@ impl Client {
liveness: AtomicBool::new(awake),
mode: Mutex::new(config.mode.clone()),
chain: RwLock::new(chain),
tracedb: tracedb,
engine: engine,
tracedb,
engine,
pruning: config.pruning.clone(),
db: RwLock::new(db.clone()),
state_db: RwLock::new(state_db),
Expand All @@ -778,8 +778,8 @@ impl Client {
ancient_blocks_import_lock: Default::default(),
queue_consensus_message: IoChannelQueue::new(usize::max_value()),
last_hashes: RwLock::new(VecDeque::new()),
factories: factories,
history: history,
factories,
history,
on_user_defaults_change: Mutex::new(None),
registrar_address,
exit_handler: Mutex::new(None),
Expand Down Expand Up @@ -1138,7 +1138,12 @@ impl Client {

/// Take a snapshot at the given block.
/// If the ID given is "latest", this will default to 1000 blocks behind.
pub fn take_snapshot<W: snapshot_io::SnapshotWriter + Send>(&self, writer: W, at: BlockId, p: &snapshot::Progress) -> Result<(), EthcoreError> {
pub fn take_snapshot<W: snapshot_io::SnapshotWriter + Send>(
&self,
writer: W,
at: BlockId,
p: &snapshot::Progress,
) -> Result<(), EthcoreError> {
let db = self.state_db.read().journal_db().boxed_clone();
let best_block_number = self.chain_info().best_block_number;
let block_number = self.block_number(at).ok_or_else(|| snapshot::Error::InvalidStartingBlock(at))?;
Expand Down Expand Up @@ -1168,8 +1173,16 @@ impl Client {
};

let processing_threads = self.config.snapshot.processing_threads;
snapshot::take_snapshot(&*self.engine, &self.chain.read(), start_hash, db.as_hash_db(), writer, p, processing_threads)?;

let chunker = self.engine.snapshot_components().ok_or(snapshot::Error::SnapshotsUnsupported)?;
snapshot::take_snapshot(
chunker,
&self.chain.read(),
start_hash,
db.as_hash_db(),
writer,
p,
processing_threads,
)?;
Ok(())
}

Expand Down
40 changes: 29 additions & 11 deletions ethcore/src/snapshot/account.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,9 +24,10 @@ use ethtrie::{TrieDB, TrieDBMut};
use hash::{KECCAK_EMPTY, KECCAK_NULL_RLP};
use hash_db::HashDB;
use rlp::{RlpStream, Rlp};
use snapshot::Error;
use snapshot::{Error, Progress};
use std::collections::HashSet;
use trie::{Trie, TrieMut};
use std::sync::atomic::Ordering;

// An empty account -- these were replaced with RLP null data for a space optimization in v1.
const ACC_EMPTY: BasicAccount = BasicAccount {
Expand Down Expand Up @@ -65,7 +66,15 @@ impl CodeState {
// walk the account's storage trie, returning a vector of RLP items containing the
// account address hash, account properties and the storage. Each item contains at most `max_storage_items`
// storage records split according to snapshot format definition.
pub fn to_fat_rlps(account_hash: &H256, acc: &BasicAccount, acct_db: &AccountDB, used_code: &mut HashSet<H256>, first_chunk_size: usize, max_chunk_size: usize) -> Result<Vec<Bytes>, Error> {
pub fn to_fat_rlps(
account_hash: &H256,
acc: &BasicAccount,
acct_db: &AccountDB,
used_code: &mut HashSet<H256>,
first_chunk_size: usize,
max_chunk_size: usize,
p: &Progress,
) -> Result<Vec<Bytes>, Error> {
let db = &(acct_db as &dyn HashDB<_,_>);
let db = TrieDB::new(db, &acc.storage_root)?;
let mut chunks = Vec::new();
Expand Down Expand Up @@ -112,6 +121,10 @@ pub fn to_fat_rlps(account_hash: &H256, acc: &BasicAccount, acct_db: &AccountDB,
}

loop {
if p.abort.load(Ordering::SeqCst) {
trace!(target: "snapshot", "to_fat_rlps: aborting snapshot");
return Err(Error::SnapshotAborted);
}
match db_iter.next() {
Some(Ok((k, v))) => {
let pair = {
Expand Down Expand Up @@ -211,6 +224,7 @@ mod tests {
use types::basic_account::BasicAccount;
use test_helpers::get_temp_state_db;
use snapshot::tests::helpers::fill_storage;
use snapshot::Progress;

use hash::{KECCAK_EMPTY, KECCAK_NULL_RLP, keccak};
use ethereum_types::{H256, Address};
Expand All @@ -236,8 +250,8 @@ mod tests {

let thin_rlp = ::rlp::encode(&account);
assert_eq!(::rlp::decode::<BasicAccount>(&thin_rlp).unwrap(), account);

let fat_rlps = to_fat_rlps(&keccak(&addr), &account, &AccountDB::new(db.as_hash_db(), &addr), &mut Default::default(), usize::max_value(), usize::max_value()).unwrap();
let p = Progress::default();
let fat_rlps = to_fat_rlps(&keccak(&addr), &account, &AccountDB::new(db.as_hash_db(), &addr), &mut Default::default(), usize::max_value(), usize::max_value(), &p).unwrap();
let fat_rlp = Rlp::new(&fat_rlps[0]).at(1).unwrap();
assert_eq!(from_fat_rlp(&mut AccountDBMut::new(db.as_hash_db_mut(), &addr), fat_rlp, H256::zero()).unwrap().0, account);
}
Expand All @@ -262,7 +276,9 @@ mod tests {
let thin_rlp = ::rlp::encode(&account);
assert_eq!(::rlp::decode::<BasicAccount>(&thin_rlp).unwrap(), account);

let fat_rlp = to_fat_rlps(&keccak(&addr), &account, &AccountDB::new(db.as_hash_db(), &addr), &mut Default::default(), usize::max_value(), usize::max_value()).unwrap();
let p = Progress::default();

let fat_rlp = to_fat_rlps(&keccak(&addr), &account, &AccountDB::new(db.as_hash_db(), &addr), &mut Default::default(), usize::max_value(), usize::max_value(), &p).unwrap();
let fat_rlp = Rlp::new(&fat_rlp[0]).at(1).unwrap();
assert_eq!(from_fat_rlp(&mut AccountDBMut::new(db.as_hash_db_mut(), &addr), fat_rlp, H256::zero()).unwrap().0, account);
}
Expand All @@ -287,7 +303,8 @@ mod tests {
let thin_rlp = ::rlp::encode(&account);
assert_eq!(::rlp::decode::<BasicAccount>(&thin_rlp).unwrap(), account);

let fat_rlps = to_fat_rlps(&keccak(addr), &account, &AccountDB::new(db.as_hash_db(), &addr), &mut Default::default(), 500, 1000).unwrap();
let p = Progress::default();
let fat_rlps = to_fat_rlps(&keccak(addr), &account, &AccountDB::new(db.as_hash_db(), &addr), &mut Default::default(), 500, 1000, &p).unwrap();
let mut root = KECCAK_NULL_RLP;
let mut restored_account = None;
for rlp in fat_rlps {
Expand Down Expand Up @@ -319,20 +336,21 @@ mod tests {
nonce: 50.into(),
balance: 123456789.into(),
storage_root: KECCAK_NULL_RLP,
code_hash: code_hash,
code_hash,
};

let account2 = BasicAccount {
nonce: 400.into(),
balance: 98765432123456789usize.into(),
storage_root: KECCAK_NULL_RLP,
code_hash: code_hash,
code_hash,
};

let mut used_code = HashSet::new();

let fat_rlp1 = to_fat_rlps(&keccak(&addr1), &account1, &AccountDB::new(db.as_hash_db(), &addr1), &mut used_code, usize::max_value(), usize::max_value()).unwrap();
let fat_rlp2 = to_fat_rlps(&keccak(&addr2), &account2, &AccountDB::new(db.as_hash_db(), &addr2), &mut used_code, usize::max_value(), usize::max_value()).unwrap();
let p1 = Progress::default();
let p2 = Progress::default();
let fat_rlp1 = to_fat_rlps(&keccak(&addr1), &account1, &AccountDB::new(db.as_hash_db(), &addr1), &mut used_code, usize::max_value(), usize::max_value(), &p1).unwrap();
let fat_rlp2 = to_fat_rlps(&keccak(&addr2), &account2, &AccountDB::new(db.as_hash_db(), &addr2), &mut used_code, usize::max_value(), usize::max_value(), &p2).unwrap();
assert_eq!(used_code.len(), 1);

let fat_rlp1 = Rlp::new(&fat_rlp1[0]).at(1).unwrap();
Expand Down
3 changes: 3 additions & 0 deletions ethcore/src/snapshot/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,8 @@ pub enum Error {
ChunkTooLarge,
/// Snapshots not supported by the consensus engine.
SnapshotsUnsupported,
/// Aborted snapshot
SnapshotAborted,
/// Bad epoch transition.
BadEpochProof(u64),
/// Wrong chunk format.
Expand Down Expand Up @@ -103,6 +105,7 @@ impl fmt::Display for Error {
Error::ChunkTooSmall => write!(f, "Chunk size is too small."),
Error::ChunkTooLarge => write!(f, "Chunk size is too large."),
Error::SnapshotsUnsupported => write!(f, "Snapshots unsupported by consensus engine."),
Error::SnapshotAborted => write!(f, "Snapshot was aborted."),
Error::BadEpochProof(i) => write!(f, "Bad epoch proof for transition to epoch {}", i),
Error::WrongChunkFormat(ref msg) => write!(f, "Wrong chunk format: {}", msg),
Error::UnlinkedAncientBlockChain => write!(f, "Unlinked ancient blocks chain"),
Expand Down
5 changes: 1 addition & 4 deletions ethcore/src/snapshot/io.rs
Original file line number Diff line number Diff line change
Expand Up @@ -310,10 +310,7 @@ impl LooseReader {

dir.pop();

Ok(LooseReader {
dir: dir,
manifest: manifest,
})
Ok(LooseReader { dir, manifest })
}
}

Expand Down
Loading