Skip to content
This repository has been archived by the owner on Nov 6, 2020. It is now read-only.

Commit

Permalink
Test harness for lightsync (#4109)
Browse files Browse the repository at this point in the history
* make on_connect/disconnect public

* free flow params constructor

* Shared ownership of LES handlers

* light provider impl for test client

* skeleton for testing light sync

* have test_client use actual genesis

* fix underflow in provider

* test harnesses for lightsync

* fix tests

* fix test failure caused by test_client changes
  • Loading branch information
rphmeier authored and arkpar committed Jan 11, 2017
1 parent 7286d42 commit 7123f19
Show file tree
Hide file tree
Showing 12 changed files with 449 additions and 104 deletions.
14 changes: 13 additions & 1 deletion ethcore/light/src/client/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,9 @@ pub trait LightChainClient: Send + Sync {
/// Clear the queue.
fn clear_queue(&self);

/// Flush the queue.
fn flush_queue(&self);

/// Get queue info.
fn queue_info(&self) -> queue::QueueInfo;

Expand Down Expand Up @@ -130,7 +133,7 @@ impl Client {

BlockChainInfo {
total_difficulty: best_block.total_difficulty,
pending_total_difficulty: best_block.total_difficulty,
pending_total_difficulty: best_block.total_difficulty + self.queue.total_difficulty(),
genesis_hash: genesis_hash,
best_block_hash: best_block.hash,
best_block_number: best_block.number,
Expand All @@ -151,6 +154,11 @@ impl Client {
self.chain.get_header(id)
}

/// Flush the header queue.
pub fn flush_queue(&self) {
self.queue.flush()
}

/// Get the `i`th CHT root.
pub fn cht_root(&self, i: usize) -> Option<H256> {
self.chain.cht_root(i)
Expand Down Expand Up @@ -211,6 +219,10 @@ impl LightChainClient for Client {
self.queue.clear()
}

fn flush_queue(&self) {
Client::flush_queue(self);
}

fn queue_info(&self) -> queue::QueueInfo {
self.queue.queue_info()
}
Expand Down
23 changes: 20 additions & 3 deletions ethcore/light/src/net/buffer_flow.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@
//! This module provides an interface for configuration of buffer
//! flow costs and recharge rates.
//!
//! Current default costs are picked completely arbitrarily, not based
//! Current default costs are picked completely arbitrarily, not based
//! on any empirical timings or mathematical models.
use request;
Expand Down Expand Up @@ -184,6 +184,23 @@ impl FlowParams {
}
}

/// Create effectively infinite flow params.
pub fn free() -> Self {
let free_cost = Cost(0.into(), 0.into());
FlowParams {
limit: (!0u64).into(),
recharge: 1.into(),
costs: CostTable {
headers: free_cost.clone(),
bodies: free_cost.clone(),
receipts: free_cost.clone(),
state_proofs: free_cost.clone(),
contract_codes: free_cost.clone(),
header_proofs: free_cost.clone(),
}
}
}

/// Get a reference to the buffer limit.
pub fn limit(&self) -> &U256 { &self.limit }

Expand All @@ -209,7 +226,7 @@ impl FlowParams {
cost.0 + (amount * cost.1)
}

/// Compute the maximum number of costs of a specific kind which can be made
/// Compute the maximum number of costs of a specific kind which can be made
/// with the given buffer.
/// Saturates at `usize::max()`. This is not a problem in practice because
/// this amount of requests is already prohibitively large.
Expand Down Expand Up @@ -317,4 +334,4 @@ mod tests {

assert_eq!(buffer.estimate, 100.into());
}
}
}
155 changes: 84 additions & 71 deletions ethcore/light/src/net/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -40,14 +40,15 @@ use self::buffer_flow::{Buffer, FlowParams};
use self::context::{Ctx, TickCtx};
use self::error::Punishment;

mod buffer_flow;
mod context;
mod error;
mod status;

#[cfg(test)]
mod tests;

pub mod buffer_flow;

pub use self::error::Error;
pub use self::context::{BasicContext, EventContext, IoContext};
pub use self::status::{Status, Capabilities, Announcement};
Expand Down Expand Up @@ -237,7 +238,7 @@ pub struct LightProtocol {
pending_requests: RwLock<HashMap<usize, Requested>>,
capabilities: RwLock<Capabilities>,
flow_params: FlowParams, // assumed static and same for every peer.
handlers: Vec<Box<Handler>>,
handlers: Vec<Arc<Handler>>,
req_id: AtomicUsize,
}

Expand Down Expand Up @@ -376,11 +377,11 @@ impl LightProtocol {
}

/// Add an event handler.
/// Ownership will be transferred to the protocol structure,
/// and the handler will be kept alive as long as it is.
///
/// These are intended to be added when the protocol structure
/// is initialized as a means of customizing its behavior.
pub fn add_handler(&mut self, handler: Box<Handler>) {
/// is initialized as a means of customizing its behavior,
/// and dispatching requests immediately upon events.
pub fn add_handler(&mut self, handler: Arc<Handler>) {
self.handlers.push(handler);
}

Expand Down Expand Up @@ -440,8 +441,10 @@ impl LightProtocol {
}
}

// handle a packet using the given io context.
fn handle_packet(&self, io: &IoContext, peer: &PeerId, packet_id: u8, data: &[u8]) {
/// Handle an LES packet using the given io context.
/// Packet data is _untrusted_, which means that invalid data won't lead to
/// issues.
pub fn handle_packet(&self, io: &IoContext, peer: &PeerId, packet_id: u8, data: &[u8]) {
let rlp = UntrustedRlp::new(data);

trace!(target: "les", "Incoming packet {} from peer {}", packet_id, peer);
Expand Down Expand Up @@ -481,67 +484,8 @@ impl LightProtocol {
}
}

// check timeouts and punish peers.
fn timeout_check(&self, io: &IoContext) {
let now = SteadyTime::now();

// handshake timeout
{
let mut pending = self.pending_peers.write();
let slowpokes: Vec<_> = pending.iter()
.filter(|&(_, ref peer)| {
peer.last_update + Duration::milliseconds(timeout::HANDSHAKE) <= now
})
.map(|(&p, _)| p)
.collect();

for slowpoke in slowpokes {
debug!(target: "les", "Peer {} handshake timed out", slowpoke);
pending.remove(&slowpoke);
io.disconnect_peer(slowpoke);
}
}

// request timeouts
{
for r in self.pending_requests.read().values() {
let kind_timeout = match r.request.kind() {
request::Kind::Headers => timeout::HEADERS,
request::Kind::Bodies => timeout::BODIES,
request::Kind::Receipts => timeout::RECEIPTS,
request::Kind::StateProofs => timeout::PROOFS,
request::Kind::Codes => timeout::CONTRACT_CODES,
request::Kind::HeaderProofs => timeout::HEADER_PROOFS,
};

if r.timestamp + Duration::milliseconds(kind_timeout) <= now {
debug!(target: "les", "Request for {:?} from peer {} timed out",
r.request.kind(), r.peer_id);

// keep the request in the `pending` set for now so
// on_disconnect will pass unfulfilled ReqIds to handlers.
// in the case that a response is received after this, the
// disconnect won't be cancelled but the ReqId won't be
// marked as abandoned.
io.disconnect_peer(r.peer_id);
}
}
}
}

fn tick_handlers(&self, io: &IoContext) {
for handler in &self.handlers {
handler.tick(&TickCtx {
io: io,
proto: self,
})
}
}
}

impl LightProtocol {
// called when a peer connects.
fn on_connect(&self, peer: &PeerId, io: &IoContext) {
/// called when a peer connects.
pub fn on_connect(&self, peer: &PeerId, io: &IoContext) {
let proto_version = match io.protocol_version(*peer).ok_or(Error::WrongNetwork) {
Ok(pv) => pv,
Err(e) => { punish(*peer, io, e); return }
Expand Down Expand Up @@ -575,8 +519,8 @@ impl LightProtocol {
io.send(*peer, packet::STATUS, status_packet);
}

// called when a peer disconnects.
fn on_disconnect(&self, peer: PeerId, io: &IoContext) {
/// called when a peer disconnects.
pub fn on_disconnect(&self, peer: PeerId, io: &IoContext) {
trace!(target: "les", "Peer {} disconnecting", peer);


Expand Down Expand Up @@ -605,6 +549,75 @@ impl LightProtocol {
}
}

// check timeouts and punish peers.
fn timeout_check(&self, io: &IoContext) {
let now = SteadyTime::now();

// handshake timeout
{
let mut pending = self.pending_peers.write();
let slowpokes: Vec<_> = pending.iter()
.filter(|&(_, ref peer)| {
peer.last_update + Duration::milliseconds(timeout::HANDSHAKE) <= now
})
.map(|(&p, _)| p)
.collect();

for slowpoke in slowpokes {
debug!(target: "les", "Peer {} handshake timed out", slowpoke);
pending.remove(&slowpoke);
io.disconnect_peer(slowpoke);
}
}

// request timeouts
{
for r in self.pending_requests.read().values() {
let kind_timeout = match r.request.kind() {
request::Kind::Headers => timeout::HEADERS,
request::Kind::Bodies => timeout::BODIES,
request::Kind::Receipts => timeout::RECEIPTS,
request::Kind::StateProofs => timeout::PROOFS,
request::Kind::Codes => timeout::CONTRACT_CODES,
request::Kind::HeaderProofs => timeout::HEADER_PROOFS,
};

if r.timestamp + Duration::milliseconds(kind_timeout) <= now {
debug!(target: "les", "Request for {:?} from peer {} timed out",
r.request.kind(), r.peer_id);

// keep the request in the `pending` set for now so
// on_disconnect will pass unfulfilled ReqIds to handlers.
// in the case that a response is received after this, the
// disconnect won't be cancelled but the ReqId won't be
// marked as abandoned.
io.disconnect_peer(r.peer_id);
}
}
}
}

/// Execute the given closure with a basic context derived from the I/O context.
pub fn with_context<F, T>(&self, io: &IoContext, f: F) -> T
where F: FnOnce(&BasicContext) -> T
{
f(&TickCtx {
io: io,
proto: self,
})
}

fn tick_handlers(&self, io: &IoContext) {
for handler in &self.handlers {
handler.tick(&TickCtx {
io: io,
proto: self,
})
}
}
}

impl LightProtocol {
// Handle status message from peer.
fn status(&self, peer: &PeerId, io: &IoContext, data: UntrustedRlp) -> Result<(), Error> {
let pending = match self.pending_peers.write().remove(peer) {
Expand Down
13 changes: 8 additions & 5 deletions ethcore/light/src/net/tests/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@
//! These don't test of the higher level logic on top of
use ethcore::blockchain_info::BlockChainInfo;
use ethcore::client::{BlockChainClient, EachBlockWith, TestBlockChainClient};
use ethcore::client::{EachBlockWith, TestBlockChainClient};
use ethcore::ids::BlockId;
use ethcore::transaction::PendingTransaction;
use ethcore::encoded;
Expand Down Expand Up @@ -88,7 +88,7 @@ impl Provider for TestProvider {
}

fn reorg_depth(&self, a: &H256, b: &H256) -> Option<u64> {
self.0.client.tree_route(a, b).map(|route| route.index as u64)
self.0.client.reorg_depth(a, b)
}

fn earliest_state(&self) -> Option<u64> {
Expand Down Expand Up @@ -305,7 +305,9 @@ fn get_block_bodies() {
}

let request = request::Bodies {
block_hashes: (0..10).map(|i| provider.client.block_hash(BlockId::Number(i)).unwrap()).collect(),
block_hashes: (0..10).map(|i|
provider.client.block_header(BlockId::Number(i)).unwrap().hash()
).collect()
};

let req_id = 111;
Expand Down Expand Up @@ -353,8 +355,9 @@ fn get_block_receipts() {

// find the first 10 block hashes starting with `f` because receipts are only provided
// by the test client in that case.
let block_hashes: Vec<_> = (0..1000).map(|i| provider.client.block_hash(BlockId::Number(i)).unwrap())
.filter(|hash| format!("{}", hash).starts_with("f")).take(10).collect();
let block_hashes: Vec<_> = (0..1000).map(|i|
provider.client.block_header(BlockId::Number(i)).unwrap().hash()
).filter(|hash| format!("{}", hash).starts_with("f")).take(10).collect();

let request = request::Receipts {
block_hashes: block_hashes.clone(),
Expand Down
2 changes: 1 addition & 1 deletion ethcore/light/src/provider.rs
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,7 @@ pub trait Provider: Send + Sync {

(0u64..req.max as u64)
.map(|x: u64| x.saturating_mul(req.skip + 1))
.take_while(|x| if req.reverse { x < &start_num } else { best_num - start_num >= *x })
.take_while(|x| if req.reverse { x < &start_num } else { best_num.saturating_sub(start_num) >= *x })
.map(|x| if req.reverse { start_num - x } else { start_num + x })
.map(|x| self.block_header(BlockId::Number(x)))
.take_while(|x| x.is_some())
Expand Down
Loading

0 comments on commit 7123f19

Please sign in to comment.