Skip to content
This repository has been archived by the owner on Nov 6, 2020. It is now read-only.

Light protocol syncing improvements #4212

Merged
merged 7 commits into from
Jan 20, 2017
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
30 changes: 24 additions & 6 deletions ethcore/light/src/client/header_chain.rs
Original file line number Diff line number Diff line change
Expand Up @@ -131,7 +131,7 @@ impl HeaderChain {
let total_difficulty = parent_td + view.difficulty();

// insert headers and candidates entries.
candidates.entry(number).or_insert_with(|| Entry { candidates: SmallVec::new(), canonical_hash: hash})
candidates.entry(number).or_insert_with(|| Entry { candidates: SmallVec::new(), canonical_hash: hash })
.candidates.push(Candidate {
hash: hash,
parent_hash: parent_hash,
Expand All @@ -144,17 +144,26 @@ impl HeaderChain {
// respective candidates vectors.
if self.best_block.read().total_difficulty < total_difficulty {
let mut canon_hash = hash;
for (_, entry) in candidates.iter_mut().rev().skip_while(|&(height, _)| *height > number) {
if entry.canonical_hash == canon_hash { break; }
for (&height, entry) in candidates.iter_mut().rev().skip_while(|&(height, _)| *height > number) {
if height != number && entry.canonical_hash == canon_hash { break; }

let canon = entry.candidates.iter().find(|x| x.hash == canon_hash)
trace!(target: "chain", "Setting new canonical block {} for block height {}",
canon_hash, height);

let canon_pos = entry.candidates.iter().position(|x| x.hash == canon_hash)
.expect("blocks are only inserted if parent is present; or this is the block we just added; qed");

// move the new canonical entry to the front and set the
// era's canonical hash.
entry.candidates.swap(0, canon_pos);
entry.canonical_hash = canon_hash;

// what about reorgs > cht::SIZE + HISTORY?
// resetting to the last block of a given CHT should be possible.
canon_hash = canon.parent_hash;
canon_hash = entry.candidates[0].parent_hash;
}

trace!(target: "chain", "New best block: ({}, {}), TD {}", number, hash, total_difficulty);
*self.best_block.write() = BlockDescriptor {
hash: hash,
number: number,
Expand Down Expand Up @@ -360,6 +369,15 @@ mod tests {
}
}

assert_eq!(chain.best_block().number, 12);
let (mut num, mut canon_hash) = (chain.best_block().number, chain.best_block().hash);
assert_eq!(num, 12);

while num > 0 {
let header: Header = ::rlp::decode(&chain.get_header(BlockId::Number(num)).unwrap());
assert_eq!(header.hash(), canon_hash);

canon_hash = *header.parent_hash();
num -= 1;
}
}
}
2 changes: 1 addition & 1 deletion ethcore/src/client/test_client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -144,7 +144,7 @@ impl TestBlockChainClient {
genesis_hash: H256::new(),
extra_data: extra_data,
last_hash: RwLock::new(H256::new()),
difficulty: RwLock::new(From::from(0)),
difficulty: RwLock::new(spec.genesis_header().difficulty().clone()),
balances: RwLock::new(HashMap::new()),
nonces: RwLock::new(HashMap::new()),
storage: RwLock::new(HashMap::new()),
Expand Down
1 change: 0 additions & 1 deletion ethcore/src/verification/queue/kind.rs
Original file line number Diff line number Diff line change
Expand Up @@ -177,7 +177,6 @@ pub mod headers {
}

/// A mode for verifying headers.
#[allow(dead_code)]
pub struct Headers;

impl Kind for Headers {
Expand Down
109 changes: 69 additions & 40 deletions sync/src/light_sync/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -55,13 +55,25 @@ mod sync_round;
mod tests;

/// Peer chain info.
#[derive(Clone)]
#[derive(Debug, Clone, PartialEq, Eq)]
struct ChainInfo {
head_td: U256,
head_hash: H256,
head_num: u64,
}

impl PartialOrd for ChainInfo {
fn partial_cmp(&self, other: &Self) -> Option<::std::cmp::Ordering> {
self.head_td.partial_cmp(&other.head_td)
}
}

impl Ord for ChainInfo {
fn cmp(&self, other: &Self) -> ::std::cmp::Ordering {
self.head_td.cmp(&other.head_td)
}
}

struct Peer {
status: ChainInfo,
}
Expand All @@ -74,6 +86,7 @@ impl Peer {
}
}
}

// search for a common ancestor with the best chain.
#[derive(Debug)]
enum AncestorSearch {
Expand Down Expand Up @@ -107,13 +120,18 @@ impl AncestorSearch {
return AncestorSearch::FoundCommon(header.number(), header.hash());
}

if header.number() <= first_num {
if header.number() < first_num {
debug!(target: "sync", "Prehistoric common ancestor with best chain.");
return AncestorSearch::Prehistoric;
}
}

AncestorSearch::Queued(start - headers.len() as u64)
let probe = start - headers.len() as u64;
if probe == 0 {
AncestorSearch::Genesis
} else {
AncestorSearch::Queued(probe)
}
}
Err(e) => {
trace!(target: "sync", "Bad headers response from {}: {}", ctx.responder(), e);
Expand All @@ -137,12 +155,13 @@ impl AncestorSearch {

match self {
AncestorSearch::Queued(start) => {
let batch_size = ::std::cmp::min(start as usize, BATCH_SIZE);
trace!(target: "sync", "Requesting {} reverse headers from {} to find common ancestor",
BATCH_SIZE, start);
batch_size, start);

let req = request::Headers {
start: start.into(),
max: ::std::cmp::min(start as usize, BATCH_SIZE),
max: batch_size,
skip: 0,
reverse: true,
};
Expand Down Expand Up @@ -185,7 +204,7 @@ impl<'a> ResponseContext for ResponseCtx<'a> {

/// Light client synchronization manager. See module docs for more details.
pub struct LightSync<L: LightChainClient> {
best_seen: Mutex<Option<(H256, U256)>>, // best seen block on the network.
best_seen: Mutex<Option<ChainInfo>>, // best seen block on the network.
peers: RwLock<HashMap<PeerId, Mutex<Peer>>>, // peers which are relevant to synchronization.
client: Arc<L>,
rng: Mutex<OsRng>,
Expand All @@ -194,9 +213,7 @@ pub struct LightSync<L: LightChainClient> {

impl<L: LightChainClient> Handler for LightSync<L> {
fn on_connect(&self, ctx: &EventContext, status: &Status, capabilities: &Capabilities) {
let our_best = self.client.chain_info().best_block_number;

if !capabilities.serve_headers || status.head_num <= our_best {
if !capabilities.serve_headers {
trace!(target: "sync", "Disconnecting irrelevant peer: {}", ctx.peer());
ctx.disconnect_peer(ctx.peer());
return;
Expand All @@ -210,9 +227,7 @@ impl<L: LightChainClient> Handler for LightSync<L> {

{
let mut best = self.best_seen.lock();
if best.as_ref().map_or(true, |b| status.head_td > b.1) {
*best = Some((status.head_hash, status.head_td));
}
*best = ::std::cmp::max(best.clone(), Some(chain_info.clone()));
}

self.peers.write().insert(ctx.peer(), Mutex::new(Peer::new(chain_info)));
Expand All @@ -231,17 +246,13 @@ impl<L: LightChainClient> Handler for LightSync<L> {

let new_best = {
let mut best = self.best_seen.lock();
let peer_best = (peer.status.head_hash, peer.status.head_td);

if best.as_ref().map_or(false, |b| b == &peer_best) {
if best.as_ref().map_or(false, |b| b == &peer.status) {
// search for next-best block.
let next_best: Option<(H256, U256)> = self.peers.read().values()
.map(|p| p.lock())
.map(|p| (p.status.head_hash, p.status.head_td))
.fold(None, |acc, x| match acc {
Some(acc) => if x.1 > acc.1 { Some(x) } else { Some(acc) },
None => Some(x),
});
let next_best: Option<ChainInfo> = self.peers.read().values()
.map(|p| p.lock().status.clone())
.map(Some)
.fold(None, ::std::cmp::max);

*best = next_best;
}
Expand All @@ -266,7 +277,7 @@ impl<L: LightChainClient> Handler for LightSync<L> {
}

fn on_announcement(&self, ctx: &EventContext, announcement: &Announcement) {
let last_td = {
let (last_td, chain_info) = {
let peers = self.peers.read();
match peers.get(&ctx.peer()) {
None => return,
Expand All @@ -278,7 +289,7 @@ impl<L: LightChainClient> Handler for LightSync<L> {
head_hash: announcement.head_hash,
head_num: announcement.head_num,
};
last_td
(last_td, peer.status.clone())
}
}
};
Expand All @@ -290,13 +301,12 @@ impl<L: LightChainClient> Handler for LightSync<L> {
trace!(target: "sync", "Peer {} moved backwards.", ctx.peer());
self.peers.write().remove(&ctx.peer());
ctx.disconnect_peer(ctx.peer());
return
}

{
let mut best = self.best_seen.lock();
if best.as_ref().map_or(true, |b| announcement.head_td > b.1) {
*best = Some((announcement.head_hash, announcement.head_td));
}
*best = ::std::cmp::max(best.clone(), Some(chain_info));
}

self.maintain_sync(ctx.as_basic());
Expand Down Expand Up @@ -352,10 +362,12 @@ impl<L: LightChainClient> LightSync<L> {
*state = SyncState::AncestorSearch(AncestorSearch::begin(chain_info.best_block_number));
}

// handles request dispatch, block import, and state machine transitions.
fn maintain_sync(&self, ctx: &BasicContext) {
const DRAIN_AMOUNT: usize = 128;

let mut state = self.state.lock();
let chain_info = self.client.chain_info();
debug!(target: "sync", "Maintaining sync ({:?})", &*state);

// drain any pending blocks into the queue.
Expand All @@ -364,8 +376,7 @@ impl<L: LightChainClient> LightSync<L> {

'a:
loop {
let queue_info = self.client.queue_info();
if queue_info.is_full() { break }
if self.client.queue_info().is_full() { break }

*state = match mem::replace(&mut *state, SyncState::Idle) {
SyncState::Rounds(round)
Expand All @@ -389,12 +400,23 @@ impl<L: LightChainClient> LightSync<L> {

// handle state transitions.
{
let chain_info = self.client.chain_info();
let best_td = chain_info.total_difficulty;
let best_td = chain_info.pending_total_difficulty;
let sync_target = match *self.best_seen.lock() {
Some(ref target) if target.head_td > best_td => (target.head_num, target.head_hash),
_ => {
trace!(target: "sync", "No target to sync to.");
*state = SyncState::Idle;
return;
}
};

match mem::replace(&mut *state, SyncState::Idle) {
_ if self.best_seen.lock().as_ref().map_or(true, |&(_, td)| best_td >= td)
=> *state = SyncState::Idle,
SyncState::Rounds(SyncRound::Abort(reason, _)) => {
SyncState::Rounds(SyncRound::Abort(reason, remaining)) => {
if remaining.len() > 0 {
*state = SyncState::Rounds(SyncRound::Abort(reason, remaining));
return;
}

match reason {
AbortReason::BadScaffold(bad_peers) => {
debug!(target: "sync", "Disabling peers responsible for bad scaffold");
Expand All @@ -403,32 +425,39 @@ impl<L: LightChainClient> LightSync<L> {
}
}
AbortReason::NoResponses => {}
AbortReason::TargetReached => {
debug!(target: "sync", "Sync target reached. Going idle");
*state = SyncState::Idle;
return;
}
}

debug!(target: "sync", "Beginning search after aborted sync round");
self.begin_search(&mut state);
}
SyncState::AncestorSearch(AncestorSearch::FoundCommon(num, hash)) => {
// TODO: compare to best block and switch to another downloading
// method when close.
*state = SyncState::Rounds(SyncRound::begin(num, hash));
*state = SyncState::Rounds(SyncRound::begin((num, hash), sync_target));
}
SyncState::AncestorSearch(AncestorSearch::Genesis) => {
// Same here.
let g_hash = chain_info.genesis_hash;
*state = SyncState::Rounds(SyncRound::begin(0, g_hash));
*state = SyncState::Rounds(SyncRound::begin((0, g_hash), sync_target));
}
SyncState::Idle => self.begin_search(&mut state),
other => *state = other, // restore displaced state.
}
}

// allow dispatching of requests.
// TODO: maybe wait until the amount of cumulative requests remaining is high enough
// to avoid pumping the failure rate.
{
let peers = self.peers.read();
let mut peer_ids: Vec<_> = peers.keys().cloned().collect();
let mut peer_ids: Vec<_> = peers.iter().filter_map(|(id, p)| {
if p.lock().status.head_td > chain_info.pending_total_difficulty {
Some(*id)
} else {
None
}
}).collect();
let mut rng = self.rng.lock();

// naive request dispatcher: just give to any peer which says it will
Expand Down
Loading