Skip to content
This repository has been archived by the owner on Nov 6, 2020. It is now read-only.

Commit

Permalink
Merge pull request #2044 from ethcore/periodic_snapshot
Browse files Browse the repository at this point in the history
Periodic snapshots
  • Loading branch information
rphmeier authored Sep 6, 2016
2 parents 5c5d9c8 + f054a7b commit 31cd965
Show file tree
Hide file tree
Showing 8 changed files with 346 additions and 23 deletions.
2 changes: 1 addition & 1 deletion ethcore/src/client/chain_notify.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ use util::H256;
/// Represents what has to be handled by actor listening to chain events
#[derive(Ipc)]
pub trait ChainNotify : Send + Sync {
/// fires when chain has new blocks
/// fires when chain has new blocks.
fn new_blocks(&self,
_imported: Vec<H256>,
_invalid: Vec<H256>,
Expand Down
17 changes: 15 additions & 2 deletions ethcore/src/service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,8 @@ pub enum ClientIoMessage {
FeedStateChunk(H256, Bytes),
/// Feed a block chunk to the snapshot service
FeedBlockChunk(H256, Bytes),
/// Take a snapshot for the block with given number.
TakeSnapshot(u64),
}

/// Client service setup. Creates and registers client and network services with the IO subsystem.
Expand Down Expand Up @@ -145,16 +147,22 @@ struct ClientIoHandler {
}

const CLIENT_TICK_TIMER: TimerToken = 0;
const SNAPSHOT_TICK_TIMER: TimerToken = 1;

const CLIENT_TICK_MS: u64 = 5000;
const SNAPSHOT_TICK_MS: u64 = 10000;

impl IoHandler<ClientIoMessage> for ClientIoHandler {
fn initialize(&self, io: &IoContext<ClientIoMessage>) {
io.register_timer(CLIENT_TICK_TIMER, CLIENT_TICK_MS).expect("Error registering client timer");
io.register_timer(SNAPSHOT_TICK_TIMER, SNAPSHOT_TICK_MS).expect("Error registering snapshot timer");
}

fn timeout(&self, _io: &IoContext<ClientIoMessage>, timer: TimerToken) {
if timer == CLIENT_TICK_TIMER {
self.client.tick();
match timer {
CLIENT_TICK_TIMER => self.client.tick(),
SNAPSHOT_TICK_TIMER => self.snapshot.tick(),
_ => warn!("IO service triggered unregistered timer '{}'", timer),
}
}

Expand All @@ -170,6 +178,11 @@ impl IoHandler<ClientIoMessage> for ClientIoHandler {
}
ClientIoMessage::FeedStateChunk(ref hash, ref chunk) => self.snapshot.feed_state_chunk(*hash, chunk),
ClientIoMessage::FeedBlockChunk(ref hash, ref chunk) => self.snapshot.feed_block_chunk(*hash, chunk),
ClientIoMessage::TakeSnapshot(num) => {
if let Err(e) = self.snapshot.take_snapshot(&*self.client, num) {
warn!("Failed to take snapshot at block #{}: {}", num, e);
}
}
_ => {} // ignore other messages
}
}
Expand Down
22 changes: 18 additions & 4 deletions ethcore/src/snapshot/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -44,8 +44,10 @@ use crossbeam::{scope, ScopedJoinHandle};
use rand::{Rng, OsRng};

pub use self::error::Error;

pub use self::service::{Service, DatabaseRestore};
pub use self::traits::{SnapshotService, RemoteSnapshotService};
pub use self::watcher::Watcher;
pub use types::snapshot_manifest::ManifestData;
pub use types::restoration_status::RestorationStatus;

Expand All @@ -55,6 +57,7 @@ pub mod service;
mod account;
mod block;
mod error;
mod watcher;

#[cfg(test)]
mod tests;
Expand All @@ -80,17 +83,28 @@ pub struct Progress {
}

impl Progress {
/// Reset the progress.
pub fn reset(&self) {
self.accounts.store(0, Ordering::Release);
self.blocks.store(0, Ordering::Release);
self.size.store(0, Ordering::Release);

// atomic fence here to ensure the others are written first?
// logs might very rarely get polluted if not.
self.done.store(false, Ordering::Release);
}

/// Get the number of accounts snapshotted thus far.
pub fn accounts(&self) -> usize { self.accounts.load(Ordering::Relaxed) }
pub fn accounts(&self) -> usize { self.accounts.load(Ordering::Acquire) }

/// Get the number of blocks snapshotted thus far.
pub fn blocks(&self) -> usize { self.blocks.load(Ordering::Relaxed) }
pub fn blocks(&self) -> usize { self.blocks.load(Ordering::Acquire) }

/// Get the written size of the snapshot in bytes.
pub fn size(&self) -> usize { self.size.load(Ordering::Relaxed) }
pub fn size(&self) -> usize { self.size.load(Ordering::Acquire) }

/// Whether the snapshot is complete.
pub fn done(&self) -> bool { self.done.load(Ordering::SeqCst) }
pub fn done(&self) -> bool { self.done.load(Ordering::Acquire) }

}
/// Take a snapshot using the given blockchain, starting block hash, and database, writing into the given writer.
Expand Down
107 changes: 91 additions & 16 deletions ethcore/src/snapshot/service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,8 +27,10 @@ use super::{ManifestData, StateRebuilder, BlockRebuilder, RestorationStatus, Sna
use super::io::{SnapshotReader, LooseReader, SnapshotWriter, LooseWriter};

use blockchain::BlockChain;
use client::Client;
use engines::Engine;
use error::Error;
use ids::BlockID;
use service::ClientIoMessage;
use spec::Spec;

Expand All @@ -39,8 +41,25 @@ use util::journaldb::Algorithm;
use util::kvdb::{Database, DatabaseConfig};
use util::snappy;

/// Helper for removing directories in case of error.
struct Guard(bool, PathBuf);

impl Guard {
fn new(path: PathBuf) -> Self { Guard(true, path) }

fn disarm(mut self) { self.0 = false }
}

impl Drop for Guard {
fn drop(&mut self) {
if self.0 {
let _ = fs::remove_dir_all(&self.1);
}
}
}

/// External database restoration handler
pub trait DatabaseRestore : Send + Sync {
pub trait DatabaseRestore: Send + Sync {
/// Restart with a new backend. Takes ownership of passed database and moves it to a new location.
fn restore_db(&self, new_db: &str) -> Result<(), Error>;
}
Expand All @@ -55,6 +74,7 @@ struct Restoration {
writer: LooseWriter,
snappy_buffer: Bytes,
final_state_root: H256,
guard: Guard,
}

struct RestorationParams<'a> {
Expand All @@ -63,6 +83,7 @@ struct RestorationParams<'a> {
db_path: PathBuf, // database path
writer: LooseWriter, // writer for recovered snapshot.
genesis: &'a [u8], // genesis block of the chain.
guard: Guard, // guard for the restoration directory.
}

impl Restoration {
Expand Down Expand Up @@ -90,6 +111,7 @@ impl Restoration {
writer: params.writer,
snappy_buffer: Vec::new(),
final_state_root: root,
guard: params.guard,
})
}

Expand Down Expand Up @@ -138,6 +160,7 @@ impl Restoration {

try!(self.writer.finish(self.manifest));

self.guard.disarm();
Ok(())
}

Expand Down Expand Up @@ -168,6 +191,7 @@ pub struct Service {
state_chunks: AtomicUsize,
block_chunks: AtomicUsize,
db_restore: Arc<DatabaseRestore>,
progress: super::Progress,
}

impl Service {
Expand Down Expand Up @@ -197,6 +221,7 @@ impl Service {
state_chunks: AtomicUsize::new(0),
block_chunks: AtomicUsize::new(0),
db_restore: db_restore,
progress: Default::default(),
};

// create the root snapshot dir if it doesn't exist.
Expand All @@ -213,6 +238,13 @@ impl Service {
}
}

// delete the temporary snapshot dir if it does exist.
if let Err(e) = fs::remove_dir_all(service.temp_snapshot_dir()) {
if e.kind() != ErrorKind::NotFound {
return Err(e.into())
}
}

Ok(service)
}

Expand All @@ -230,6 +262,13 @@ impl Service {
dir
}

// get the temporary snapshot dir.
fn temp_snapshot_dir(&self) -> PathBuf {
let mut dir = self.root_dir();
dir.push("in_progress");
dir
}

// get the restoration directory.
fn restoration_dir(&self) -> PathBuf {
let mut dir = self.root_dir();
Expand Down Expand Up @@ -260,6 +299,48 @@ impl Service {
Ok(())
}

/// Tick the snapshot service. This will log any active snapshot
/// being taken.
pub fn tick(&self) {
if self.progress.done() { return }

let p = &self.progress;
info!("Snapshot: {} accounts {} blocks {} bytes", p.accounts(), p.blocks(), p.size());
}

/// Take a snapshot at the block with the given number.
/// calling this while a restoration is in progress or vice versa
/// will lead to a race condition where the first one to finish will
/// have their produced snapshot overwritten.
pub fn take_snapshot(&self, client: &Client, num: u64) -> Result<(), Error> {
info!("Taking snapshot at #{}", num);
self.progress.reset();

let temp_dir = self.temp_snapshot_dir();
let snapshot_dir = self.snapshot_dir();

let _ = fs::remove_dir_all(&temp_dir);

let writer = try!(LooseWriter::new(temp_dir.clone()));

let guard = Guard::new(temp_dir.clone());
try!(client.take_snapshot(writer, BlockID::Number(num), &self.progress));

info!("Finished taking snapshot at #{}", num);

let mut reader = self.reader.write();

// destroy the old snapshot reader.
*reader = None;

try!(fs::rename(temp_dir, &snapshot_dir));

*reader = Some(try!(LooseReader::new(snapshot_dir)));

guard.disarm();
Ok(())
}

/// Initialize the restoration synchronously.
pub fn init_restore(&self, manifest: ManifestData) -> Result<(), Error> {
let rest_dir = self.restoration_dir();
Expand Down Expand Up @@ -288,6 +369,7 @@ impl Service {
db_path: self.restoration_db(),
writer: writer,
genesis: &self.genesis_block,
guard: Guard::new(rest_dir),
};

*res = Some(try!(Restoration::new(params)));
Expand Down Expand Up @@ -328,14 +410,7 @@ impl Service {
try!(fs::create_dir(&snapshot_dir));

trace!(target: "snapshot", "copying restored snapshot files over");
for maybe_file in try!(fs::read_dir(self.temp_recovery_dir())) {
let path = try!(maybe_file).path();
if let Some(name) = path.file_name().map(|x| x.to_owned()) {
let mut new_path = snapshot_dir.clone();
new_path.push(name);
try!(fs::rename(path, new_path));
}
}
try!(fs::rename(self.temp_recovery_dir(), &snapshot_dir));

let _ = fs::remove_dir_all(self.restoration_dir());

Expand Down Expand Up @@ -451,6 +526,12 @@ impl SnapshotService for Service {
}
}

impl Drop for Service {
fn drop(&mut self) {
self.abort_restore();
}
}

#[cfg(test)]
mod tests {
use std::sync::Arc;
Expand Down Expand Up @@ -504,10 +585,4 @@ mod tests {
service.restore_state_chunk(Default::default(), vec![]);
service.restore_block_chunk(Default::default(), vec![]);
}
}

impl Drop for Service {
fn drop(&mut self) {
self.abort_restore();
}
}
}
Loading

0 comments on commit 31cd965

Please sign in to comment.