Skip to content
This repository has been archived by the owner on Nov 6, 2020. It is now read-only.

Commit

Permalink
Light protocol syncing improvements (#4212)
Browse files Browse the repository at this point in the history
* remove old lint silencer

* dispatch requests only to peers with higher TD

* dynamic target for sync rounds

* use round pivots instead of frames, fix test

* fix total difficulty calculation for test client

* fix broken reorg algorithm

* fork test, fix ancestor search
  • Loading branch information
rphmeier authored and arkpar committed Jan 20, 2017
1 parent 3ff9324 commit a791cb5
Show file tree
Hide file tree
Showing 7 changed files with 254 additions and 80 deletions.
30 changes: 24 additions & 6 deletions ethcore/light/src/client/header_chain.rs
Original file line number Diff line number Diff line change
Expand Up @@ -131,7 +131,7 @@ impl HeaderChain {
let total_difficulty = parent_td + view.difficulty();

// insert headers and candidates entries.
candidates.entry(number).or_insert_with(|| Entry { candidates: SmallVec::new(), canonical_hash: hash})
candidates.entry(number).or_insert_with(|| Entry { candidates: SmallVec::new(), canonical_hash: hash })
.candidates.push(Candidate {
hash: hash,
parent_hash: parent_hash,
Expand All @@ -144,17 +144,26 @@ impl HeaderChain {
// respective candidates vectors.
if self.best_block.read().total_difficulty < total_difficulty {
let mut canon_hash = hash;
for (_, entry) in candidates.iter_mut().rev().skip_while(|&(height, _)| *height > number) {
if entry.canonical_hash == canon_hash { break; }
for (&height, entry) in candidates.iter_mut().rev().skip_while(|&(height, _)| *height > number) {
if height != number && entry.canonical_hash == canon_hash { break; }

let canon = entry.candidates.iter().find(|x| x.hash == canon_hash)
trace!(target: "chain", "Setting new canonical block {} for block height {}",
canon_hash, height);

let canon_pos = entry.candidates.iter().position(|x| x.hash == canon_hash)
.expect("blocks are only inserted if parent is present; or this is the block we just added; qed");

// move the new canonical entry to the front and set the
// era's canonical hash.
entry.candidates.swap(0, canon_pos);
entry.canonical_hash = canon_hash;

// what about reorgs > cht::SIZE + HISTORY?
// resetting to the last block of a given CHT should be possible.
canon_hash = canon.parent_hash;
canon_hash = entry.candidates[0].parent_hash;
}

trace!(target: "chain", "New best block: ({}, {}), TD {}", number, hash, total_difficulty);
*self.best_block.write() = BlockDescriptor {
hash: hash,
number: number,
Expand Down Expand Up @@ -360,6 +369,15 @@ mod tests {
}
}

assert_eq!(chain.best_block().number, 12);
let (mut num, mut canon_hash) = (chain.best_block().number, chain.best_block().hash);
assert_eq!(num, 12);

while num > 0 {
let header: Header = ::rlp::decode(&chain.get_header(BlockId::Number(num)).unwrap());
assert_eq!(header.hash(), canon_hash);

canon_hash = *header.parent_hash();
num -= 1;
}
}
}
2 changes: 1 addition & 1 deletion ethcore/src/client/test_client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -144,7 +144,7 @@ impl TestBlockChainClient {
genesis_hash: H256::new(),
extra_data: extra_data,
last_hash: RwLock::new(H256::new()),
difficulty: RwLock::new(From::from(0)),
difficulty: RwLock::new(spec.genesis_header().difficulty().clone()),
balances: RwLock::new(HashMap::new()),
nonces: RwLock::new(HashMap::new()),
storage: RwLock::new(HashMap::new()),
Expand Down
1 change: 0 additions & 1 deletion ethcore/src/verification/queue/kind.rs
Original file line number Diff line number Diff line change
Expand Up @@ -177,7 +177,6 @@ pub mod headers {
}

/// A mode for verifying headers.
#[allow(dead_code)]
pub struct Headers;

impl Kind for Headers {
Expand Down
109 changes: 69 additions & 40 deletions sync/src/light_sync/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -55,13 +55,25 @@ mod sync_round;
mod tests;

/// Peer chain info.
#[derive(Clone)]
#[derive(Debug, Clone, PartialEq, Eq)]
struct ChainInfo {
head_td: U256,
head_hash: H256,
head_num: u64,
}

impl PartialOrd for ChainInfo {
fn partial_cmp(&self, other: &Self) -> Option<::std::cmp::Ordering> {
self.head_td.partial_cmp(&other.head_td)
}
}

impl Ord for ChainInfo {
fn cmp(&self, other: &Self) -> ::std::cmp::Ordering {
self.head_td.cmp(&other.head_td)
}
}

struct Peer {
status: ChainInfo,
}
Expand All @@ -74,6 +86,7 @@ impl Peer {
}
}
}

// search for a common ancestor with the best chain.
#[derive(Debug)]
enum AncestorSearch {
Expand Down Expand Up @@ -107,13 +120,18 @@ impl AncestorSearch {
return AncestorSearch::FoundCommon(header.number(), header.hash());
}

if header.number() <= first_num {
if header.number() < first_num {
debug!(target: "sync", "Prehistoric common ancestor with best chain.");
return AncestorSearch::Prehistoric;
}
}

AncestorSearch::Queued(start - headers.len() as u64)
let probe = start - headers.len() as u64;
if probe == 0 {
AncestorSearch::Genesis
} else {
AncestorSearch::Queued(probe)
}
}
Err(e) => {
trace!(target: "sync", "Bad headers response from {}: {}", ctx.responder(), e);
Expand All @@ -137,12 +155,13 @@ impl AncestorSearch {

match self {
AncestorSearch::Queued(start) => {
let batch_size = ::std::cmp::min(start as usize, BATCH_SIZE);
trace!(target: "sync", "Requesting {} reverse headers from {} to find common ancestor",
BATCH_SIZE, start);
batch_size, start);

let req = request::Headers {
start: start.into(),
max: ::std::cmp::min(start as usize, BATCH_SIZE),
max: batch_size,
skip: 0,
reverse: true,
};
Expand Down Expand Up @@ -185,7 +204,7 @@ impl<'a> ResponseContext for ResponseCtx<'a> {

/// Light client synchronization manager. See module docs for more details.
pub struct LightSync<L: LightChainClient> {
best_seen: Mutex<Option<(H256, U256)>>, // best seen block on the network.
best_seen: Mutex<Option<ChainInfo>>, // best seen block on the network.
peers: RwLock<HashMap<PeerId, Mutex<Peer>>>, // peers which are relevant to synchronization.
client: Arc<L>,
rng: Mutex<OsRng>,
Expand All @@ -194,9 +213,7 @@ pub struct LightSync<L: LightChainClient> {

impl<L: LightChainClient> Handler for LightSync<L> {
fn on_connect(&self, ctx: &EventContext, status: &Status, capabilities: &Capabilities) {
let our_best = self.client.chain_info().best_block_number;

if !capabilities.serve_headers || status.head_num <= our_best {
if !capabilities.serve_headers {
trace!(target: "sync", "Disconnecting irrelevant peer: {}", ctx.peer());
ctx.disconnect_peer(ctx.peer());
return;
Expand All @@ -210,9 +227,7 @@ impl<L: LightChainClient> Handler for LightSync<L> {

{
let mut best = self.best_seen.lock();
if best.as_ref().map_or(true, |b| status.head_td > b.1) {
*best = Some((status.head_hash, status.head_td));
}
*best = ::std::cmp::max(best.clone(), Some(chain_info.clone()));
}

self.peers.write().insert(ctx.peer(), Mutex::new(Peer::new(chain_info)));
Expand All @@ -231,17 +246,13 @@ impl<L: LightChainClient> Handler for LightSync<L> {

let new_best = {
let mut best = self.best_seen.lock();
let peer_best = (peer.status.head_hash, peer.status.head_td);

if best.as_ref().map_or(false, |b| b == &peer_best) {
if best.as_ref().map_or(false, |b| b == &peer.status) {
// search for next-best block.
let next_best: Option<(H256, U256)> = self.peers.read().values()
.map(|p| p.lock())
.map(|p| (p.status.head_hash, p.status.head_td))
.fold(None, |acc, x| match acc {
Some(acc) => if x.1 > acc.1 { Some(x) } else { Some(acc) },
None => Some(x),
});
let next_best: Option<ChainInfo> = self.peers.read().values()
.map(|p| p.lock().status.clone())
.map(Some)
.fold(None, ::std::cmp::max);

*best = next_best;
}
Expand All @@ -266,7 +277,7 @@ impl<L: LightChainClient> Handler for LightSync<L> {
}

fn on_announcement(&self, ctx: &EventContext, announcement: &Announcement) {
let last_td = {
let (last_td, chain_info) = {
let peers = self.peers.read();
match peers.get(&ctx.peer()) {
None => return,
Expand All @@ -278,7 +289,7 @@ impl<L: LightChainClient> Handler for LightSync<L> {
head_hash: announcement.head_hash,
head_num: announcement.head_num,
};
last_td
(last_td, peer.status.clone())
}
}
};
Expand All @@ -290,13 +301,12 @@ impl<L: LightChainClient> Handler for LightSync<L> {
trace!(target: "sync", "Peer {} moved backwards.", ctx.peer());
self.peers.write().remove(&ctx.peer());
ctx.disconnect_peer(ctx.peer());
return
}

{
let mut best = self.best_seen.lock();
if best.as_ref().map_or(true, |b| announcement.head_td > b.1) {
*best = Some((announcement.head_hash, announcement.head_td));
}
*best = ::std::cmp::max(best.clone(), Some(chain_info));
}

self.maintain_sync(ctx.as_basic());
Expand Down Expand Up @@ -352,10 +362,12 @@ impl<L: LightChainClient> LightSync<L> {
*state = SyncState::AncestorSearch(AncestorSearch::begin(chain_info.best_block_number));
}

// handles request dispatch, block import, and state machine transitions.
fn maintain_sync(&self, ctx: &BasicContext) {
const DRAIN_AMOUNT: usize = 128;

let mut state = self.state.lock();
let chain_info = self.client.chain_info();
debug!(target: "sync", "Maintaining sync ({:?})", &*state);

// drain any pending blocks into the queue.
Expand All @@ -364,8 +376,7 @@ impl<L: LightChainClient> LightSync<L> {

'a:
loop {
let queue_info = self.client.queue_info();
if queue_info.is_full() { break }
if self.client.queue_info().is_full() { break }

*state = match mem::replace(&mut *state, SyncState::Idle) {
SyncState::Rounds(round)
Expand All @@ -389,12 +400,23 @@ impl<L: LightChainClient> LightSync<L> {

// handle state transitions.
{
let chain_info = self.client.chain_info();
let best_td = chain_info.total_difficulty;
let best_td = chain_info.pending_total_difficulty;
let sync_target = match *self.best_seen.lock() {
Some(ref target) if target.head_td > best_td => (target.head_num, target.head_hash),
_ => {
trace!(target: "sync", "No target to sync to.");
*state = SyncState::Idle;
return;
}
};

match mem::replace(&mut *state, SyncState::Idle) {
_ if self.best_seen.lock().as_ref().map_or(true, |&(_, td)| best_td >= td)
=> *state = SyncState::Idle,
SyncState::Rounds(SyncRound::Abort(reason, _)) => {
SyncState::Rounds(SyncRound::Abort(reason, remaining)) => {
if remaining.len() > 0 {
*state = SyncState::Rounds(SyncRound::Abort(reason, remaining));
return;
}

match reason {
AbortReason::BadScaffold(bad_peers) => {
debug!(target: "sync", "Disabling peers responsible for bad scaffold");
Expand All @@ -403,32 +425,39 @@ impl<L: LightChainClient> LightSync<L> {
}
}
AbortReason::NoResponses => {}
AbortReason::TargetReached => {
debug!(target: "sync", "Sync target reached. Going idle");
*state = SyncState::Idle;
return;
}
}

debug!(target: "sync", "Beginning search after aborted sync round");
self.begin_search(&mut state);
}
SyncState::AncestorSearch(AncestorSearch::FoundCommon(num, hash)) => {
// TODO: compare to best block and switch to another downloading
// method when close.
*state = SyncState::Rounds(SyncRound::begin(num, hash));
*state = SyncState::Rounds(SyncRound::begin((num, hash), sync_target));
}
SyncState::AncestorSearch(AncestorSearch::Genesis) => {
// Same here.
let g_hash = chain_info.genesis_hash;
*state = SyncState::Rounds(SyncRound::begin(0, g_hash));
*state = SyncState::Rounds(SyncRound::begin((0, g_hash), sync_target));
}
SyncState::Idle => self.begin_search(&mut state),
other => *state = other, // restore displaced state.
}
}

// allow dispatching of requests.
// TODO: maybe wait until the amount of cumulative requests remaining is high enough
// to avoid pumping the failure rate.
{
let peers = self.peers.read();
let mut peer_ids: Vec<_> = peers.keys().cloned().collect();
let mut peer_ids: Vec<_> = peers.iter().filter_map(|(id, p)| {
if p.lock().status.head_td > chain_info.pending_total_difficulty {
Some(*id)
} else {
None
}
}).collect();
let mut rng = self.rng.lock();

// naive request dispatcher: just give to any peer which says it will
Expand Down
Loading

0 comments on commit a791cb5

Please sign in to comment.