Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Track blocks we requested, always broadcast otherwise #2349

Merged
merged 1 commit into from
Jan 12, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
54 changes: 38 additions & 16 deletions p2p/src/peer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -261,7 +261,7 @@ impl Peer {
/// Sends the provided block to the remote peer. The request may be dropped
/// if the remote peer is known to already have the block.
pub fn send_block(&self, b: &core::Block) -> Result<bool, Error> {
if !self.tracking_adapter.has(b.hash()) {
if !self.tracking_adapter.has_recv(b.hash()) {
trace!("Send block {} to {}", b.hash(), self.info.addr);
self.connection
.as_ref()
Expand All @@ -280,7 +280,7 @@ impl Peer {
}

pub fn send_compact_block(&self, b: &core::CompactBlock) -> Result<bool, Error> {
if !self.tracking_adapter.has(b.hash()) {
if !self.tracking_adapter.has_recv(b.hash()) {
trace!("Send compact block {} to {}", b.hash(), self.info.addr);
self.connection
.as_ref()
Expand All @@ -299,7 +299,7 @@ impl Peer {
}

pub fn send_header(&self, bh: &core::BlockHeader) -> Result<bool, Error> {
if !self.tracking_adapter.has(bh.hash()) {
if !self.tracking_adapter.has_recv(bh.hash()) {
debug!("Send header {} to {}", bh.hash(), self.info.addr);
self.connection
.as_ref()
Expand All @@ -318,7 +318,7 @@ impl Peer {
}

pub fn send_tx_kernel_hash(&self, h: Hash) -> Result<bool, Error> {
if !self.tracking_adapter.has(h) {
if !self.tracking_adapter.has_recv(h) {
debug!("Send tx kernel hash {} to {}", h, self.info.addr);
self.connection
.as_ref()
Expand Down Expand Up @@ -350,7 +350,7 @@ impl Peer {
return self.send_tx_kernel_hash(kernel.hash());
}

if !self.tracking_adapter.has(kernel.hash()) {
if !self.tracking_adapter.has_recv(kernel.hash()) {
debug!("Send full tx {} to {}", tx.hash(), self.info.addr);
self.connection
.as_ref()
Expand Down Expand Up @@ -405,6 +405,7 @@ impl Peer {
/// Sends a request for a specific block by hash
pub fn send_block_request(&self, h: Hash) -> Result<(), Error> {
debug!("Requesting block {} from peer {}.", h, self.info.addr);
self.tracking_adapter.push_req(h);
self.connection
.as_ref()
.unwrap()
Expand Down Expand Up @@ -499,29 +500,32 @@ fn stop_with_connection(connection: &conn::Tracker) {
}

/// Adapter implementation that forwards everything to an underlying adapter
/// but keeps track of the block and transaction hashes that were received.
/// but keeps track of the block and transaction hashes that were requested or
/// received.
#[derive(Clone)]
struct TrackingAdapter {
adapter: Arc<dyn NetAdapter>,
known: Arc<RwLock<Vec<Hash>>>,
requested: Arc<RwLock<Vec<Hash>>>,
}

impl TrackingAdapter {
fn new(adapter: Arc<dyn NetAdapter>) -> TrackingAdapter {
TrackingAdapter {
adapter: adapter,
known: Arc::new(RwLock::new(vec![])),
known: Arc::new(RwLock::new(Vec::with_capacity(MAX_TRACK_SIZE))),
requested: Arc::new(RwLock::new(Vec::with_capacity(MAX_TRACK_SIZE))),
}
garyyu marked this conversation as resolved.
Show resolved Hide resolved
}

fn has(&self, hash: Hash) -> bool {
fn has_recv(&self, hash: Hash) -> bool {
let known = self.known.read();
// may become too slow, an ordered set (by timestamp for eviction) may
// end up being a better choice
known.contains(&hash)
}

fn push(&self, hash: Hash) {
fn push_recv(&self, hash: Hash) {
let mut known = self.known.write();
if known.len() > MAX_TRACK_SIZE {
known.truncate(MAX_TRACK_SIZE);
Expand All @@ -530,6 +534,23 @@ impl TrackingAdapter {
known.insert(0, hash);
}
}

fn has_req(&self, hash: Hash) -> bool {
let requested = self.requested.read();
// may become too slow, an ordered set (by timestamp for eviction) may
// end up being a better choice
requested.contains(&hash)
}

fn push_req(&self, hash: Hash) {
let mut requested = self.requested.write();
if requested.len() > MAX_TRACK_SIZE {
requested.truncate(MAX_TRACK_SIZE);
}
if !requested.contains(&hash) {
requested.insert(0, hash);
}
}
}

impl ChainAdapter for TrackingAdapter {
Expand All @@ -546,7 +567,7 @@ impl ChainAdapter for TrackingAdapter {
}

fn tx_kernel_received(&self, kernel_hash: Hash, addr: SocketAddr) {
self.push(kernel_hash);
self.push_recv(kernel_hash);
self.adapter.tx_kernel_received(kernel_hash, addr)
}

Expand All @@ -556,23 +577,24 @@ impl ChainAdapter for TrackingAdapter {
// correctly.
if !stem {
let kernel = &tx.kernels()[0];
self.push(kernel.hash());
self.push_recv(kernel.hash());
}
self.adapter.transaction_received(tx, stem)
}

fn block_received(&self, b: core::Block, addr: SocketAddr) -> bool {
self.push(b.hash());
self.adapter.block_received(b, addr)
fn block_received(&self, b: core::Block, addr: SocketAddr, _was_requested: bool) -> bool {
let bh = b.hash();
self.push_recv(bh);
self.adapter.block_received(b, addr, self.has_req(bh))
}

fn compact_block_received(&self, cb: core::CompactBlock, addr: SocketAddr) -> bool {
self.push(cb.hash());
self.push_recv(cb.hash());
self.adapter.compact_block_received(cb, addr)
}

fn header_received(&self, bh: core::BlockHeader, addr: SocketAddr) -> bool {
self.push(bh.hash());
self.push_recv(bh.hash());
self.adapter.header_received(bh, addr)
}

Expand Down
4 changes: 2 additions & 2 deletions p2p/src/peers.rs
Original file line number Diff line number Diff line change
Expand Up @@ -554,9 +554,9 @@ impl ChainAdapter for Peers {
self.adapter.transaction_received(tx, stem)
}

fn block_received(&self, b: core::Block, peer_addr: SocketAddr) -> bool {
fn block_received(&self, b: core::Block, peer_addr: SocketAddr, was_requested: bool) -> bool {
let hash = b.hash();
if !self.adapter.block_received(b, peer_addr) {
if !self.adapter.block_received(b, peer_addr, was_requested) {
// if the peer sent us a block that's intrinsically bad
// they are either mistaken or malevolent, both of which require a ban
debug!(
Expand Down
4 changes: 3 additions & 1 deletion p2p/src/protocol.rs
Original file line number Diff line number Diff line change
Expand Up @@ -154,7 +154,9 @@ impl MessageHandler for Protocol {
);
let b: core::Block = msg.body()?;

adapter.block_received(b, self.addr);
// we can't know at this level whether we requested the block or not,
// the boolean should be properly set in higher level adapter
adapter.block_received(b, self.addr, false);
Ok(None)
}

Expand Down
2 changes: 1 addition & 1 deletion p2p/src/serv.rs
Original file line number Diff line number Diff line change
Expand Up @@ -242,7 +242,7 @@ impl ChainAdapter for DummyAdapter {
fn header_received(&self, _bh: core::BlockHeader, _addr: SocketAddr) -> bool {
true
}
fn block_received(&self, _: core::Block, _: SocketAddr) -> bool {
fn block_received(&self, _: core::Block, _: SocketAddr, _: bool) -> bool {
true
}
fn headers_received(&self, _: &[core::BlockHeader], _: SocketAddr) -> bool {
Expand Down
2 changes: 1 addition & 1 deletion p2p/src/types.rs
Original file line number Diff line number Diff line change
Expand Up @@ -355,7 +355,7 @@ pub trait ChainAdapter: Sync + Send {
/// block could be handled properly and is not deemed defective by the
/// chain. Returning false means the block will never be valid and
/// may result in the peer being banned.
fn block_received(&self, b: core::Block, addr: SocketAddr) -> bool;
fn block_received(&self, b: core::Block, addr: SocketAddr, was_requested: bool) -> bool;

fn compact_block_received(&self, cb: core::CompactBlock, addr: SocketAddr) -> bool;

Expand Down
49 changes: 24 additions & 25 deletions servers/src/common/adapters.rs
Original file line number Diff line number Diff line change
Expand Up @@ -107,7 +107,7 @@ impl p2p::ChainAdapter for NetToChainAdapter {
}
}

fn block_received(&self, b: core::Block, addr: SocketAddr) -> bool {
fn block_received(&self, b: core::Block, addr: SocketAddr, was_requested: bool) -> bool {
debug!(
"Received block {} at {} from {} [in/out/kern: {}/{}/{}] going to process.",
b.hash(),
Expand All @@ -117,7 +117,7 @@ impl p2p::ChainAdapter for NetToChainAdapter {
b.outputs().len(),
b.kernels().len(),
);
self.process_block(b, addr)
self.process_block(b, addr, was_requested)
}

fn compact_block_received(&self, cb: core::CompactBlock, addr: SocketAddr) -> bool {
Expand All @@ -136,7 +136,7 @@ impl p2p::ChainAdapter for NetToChainAdapter {
if cb.kern_ids().is_empty() {
// push the freshly hydrated block through the chain pipeline
match core::Block::hydrate_from(cb, vec![]) {
Ok(block) => self.process_block(block, addr),
Ok(block) => self.process_block(block, addr, false),
Err(e) => {
debug!("Invalid hydrated block {}: {:?}", cb_hash, e);
return false;
Expand All @@ -146,7 +146,7 @@ impl p2p::ChainAdapter for NetToChainAdapter {
// check at least the header is valid before hydrating
if let Err(e) = self
.chain()
.process_block_header(&cb.header, self.chain_opts())
.process_block_header(&cb.header, self.chain_opts(false))
{
debug!("Invalid compact block header {}: {:?}", cb_hash, e.kind());
return !e.is_bad_data();
Expand Down Expand Up @@ -183,7 +183,7 @@ impl p2p::ChainAdapter for NetToChainAdapter {
.is_ok()
{
debug!("successfully hydrated block from tx pool!");
self.process_block(block, addr)
self.process_block(block, addr, false)
} else {
if self.sync_state.status() == SyncStatus::NoSync {
debug!("adapter: block invalid after hydration, requesting full block");
Expand All @@ -210,7 +210,7 @@ impl p2p::ChainAdapter for NetToChainAdapter {

// pushing the new block header through the header chain pipeline
// we will go ask for the block if this is a new header
let res = self.chain().process_block_header(&bh, self.chain_opts());
let res = self.chain().process_block_header(&bh, self.chain_opts(false));

if let &Err(ref e) = &res {
debug!("Block header {} refused by chain: {:?}", bhash, e.kind());
Expand Down Expand Up @@ -239,7 +239,7 @@ impl p2p::ChainAdapter for NetToChainAdapter {
}

// try to add headers to our header chain
let res = self.chain().sync_block_headers(bhs, self.chain_opts());
let res = self.chain().sync_block_headers(bhs, self.chain_opts(true));
if let &Err(ref e) = &res {
debug!("Block headers refused by chain: {:?}", e);

Expand Down Expand Up @@ -419,7 +419,7 @@ impl NetToChainAdapter {

// pushing the new block through the chain pipeline
// remembering to reset the head if we have a bad block
fn process_block(&self, b: core::Block, addr: SocketAddr) -> bool {
fn process_block(&self, b: core::Block, addr: SocketAddr, was_requested: bool) -> bool {
// We cannot process blocks earlier than the horizon so check for this here.
{
let head = self.chain().head().unwrap();
Expand All @@ -434,7 +434,7 @@ impl NetToChainAdapter {
let bhash = b.hash();
let previous = self.chain().get_previous_header(&b.header);

match self.chain().process_block(b, self.chain_opts()) {
match self.chain().process_block(b, self.chain_opts(was_requested)) {
Ok(_) => {
self.validate_chain(bhash);
self.check_compact();
Expand Down Expand Up @@ -587,8 +587,8 @@ impl NetToChainAdapter {
}

/// Prepare options for the chain pipeline
fn chain_opts(&self) -> chain::Options {
let opts = if self.sync_state.is_syncing() {
fn chain_opts(&self, was_requested: bool) -> chain::Options {
let opts = if was_requested {
chain::Options::SYNC
} else {
chain::Options::NONE
Expand Down Expand Up @@ -635,20 +635,19 @@ impl ChainAdapter for ChainToPoolAndNetAdapter {
}
}

if self.sync_state.is_syncing() {
return;
}

// If we mined the block then we want to broadcast the compact block.
// If we received the block from another node then broadcast "header first"
// to minimize network traffic.
if opts.contains(Options::MINE) {
// propagate compact block out if we mined the block
let cb: CompactBlock = b.clone().into();
self.peers().broadcast_compact_block(&cb);
} else {
// "header first" propagation if we are not the originator of this block
self.peers().broadcast_header(&b.header);
// not broadcasting blocks received through sync
if !opts.contains(chain::Options::SYNC) {
// If we mined the block then we want to broadcast the compact block.
// If we received the block from another node then broadcast "header first"
// to minimize network traffic.
if opts.contains(Options::MINE) {
// propagate compact block out if we mined the block
let cb: CompactBlock = b.clone().into();
self.peers().broadcast_compact_block(&cb);
} else {
// "header first" propagation if we are not the originator of this block
self.peers().broadcast_header(&b.header);
}
}

// Reconcile the txpool against the new block *after* we have broadcast it too our peers.
Expand Down