Skip to content

Commit

Permalink
feat(network): send notfound when Zebra doesn't have a block or trans…
Browse files Browse the repository at this point in the history
…action
  • Loading branch information
teor2345 committed Feb 4, 2022
1 parent 579ba9a commit 0efc271
Show file tree
Hide file tree
Showing 10 changed files with 265 additions and 168 deletions.
2 changes: 1 addition & 1 deletion zebra-network/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -158,7 +158,7 @@ pub use crate::{
peer::{HandshakeError, PeerError, SharedPeerError},
peer_set::init,
policies::RetryLimit,
protocol::internal::{Request, Response},
protocol::internal::{Request, Response, ResponseStatus},
};

/// Types used in the definition of [`Request`] and [`Response`] messages.
Expand Down
172 changes: 102 additions & 70 deletions zebra-network/src/peer/connection.rs
Original file line number Diff line number Diff line change
Expand Up @@ -35,11 +35,13 @@ use crate::{
peer_set::ConnectionTracker,
protocol::{
external::{types::Nonce, InventoryHash, Message},
internal::{Request, Response},
internal::{Request, Response, ResponseStatus},
},
BoxError,
};

use ResponseStatus::*;

mod peer_tx;

#[cfg(test)]
Expand Down Expand Up @@ -160,18 +162,11 @@ impl Handler {
) => {
// assumptions:
// - the transaction messages are sent in a single continuous batch
// - missing transaction hashes are included in a `NotFound` message
// - missing transactions are silently skipped
// (there is no `NotFound` message at the end of the batch)
if pending_ids.remove(&transaction.id) {
// we are in the middle of the continuous transaction messages
transactions.push(transaction);
if pending_ids.is_empty() {
Handler::Finished(Ok(Response::Transactions(transactions)))
} else {
Handler::TransactionsById {
pending_ids,
transactions,
}
}
} else {
// We got a transaction we didn't ask for. If the caller doesn't know any of the
// transactions, they should have sent a `NotFound` with all the hashes, rather
Expand All @@ -188,18 +183,25 @@ impl Handler {
// connection open, so the inbound service can process transactions from good
// peers (case 2).
ignored_msg = Some(Message::Tx(transaction));
if !transactions.is_empty() {
// if our peers start sending mixed solicited and unsolicited transactions,
// we should update this code to handle those responses
info!("unexpected transaction from peer: transaction responses should be sent in a continuous batch, followed by notfound. Using partial received transactions as the peer response");
// TODO: does the caller need a list of missing transactions? (#1515)
Handler::Finished(Ok(Response::Transactions(transactions)))
} else {
// TODO: is it really an error if we ask for a transaction hash, but the peer
// doesn't know it? Should we close the connection on that kind of error?
// Should we fake a NotFound response here? (#1515)
let missing_transaction_ids = pending_ids.iter().map(Into::into).collect();
Handler::Finished(Err(PeerError::NotFound(missing_transaction_ids)))
}

if ignored_msg.is_some() && transactions.is_empty() {
// If we didn't get anything we wanted, retry the request.
let missing_transaction_ids = pending_ids.into_iter().map(Into::into).collect();
Handler::Finished(Err(PeerError::NotFound(missing_transaction_ids)))
} else if pending_ids.is_empty() || ignored_msg.is_some() {
// If we got some of what we wanted, let the internal client know.
let available = transactions.into_iter().map(ResponseStatus::Available);
let missing = pending_ids.into_iter().map(ResponseStatus::Missing);

Handler::Finished(Ok(Response::Transactions(
available.chain(missing).collect(),
)))
} else {
// Keep on waiting for more.
Handler::TransactionsById {
pending_ids,
transactions,
}
}
}
Expand All @@ -219,7 +221,7 @@ impl Handler {
//
// If we're in sync with the peer, then the `NotFound` should contain the remaining
// hashes from the handler. If we're not in sync with the peer, we should return
// what we got so far, and log an error.
// what we got so far.
let missing_transaction_ids: HashSet<_> = transaction_ids(&missing_invs).collect();
if missing_transaction_ids != pending_ids {
trace!(?missing_invs, ?missing_transaction_ids, ?pending_ids);
Expand All @@ -231,13 +233,18 @@ impl Handler {
info!("unexpected notfound message from peer: notfound contains duplicate hashes or non-transaction hashes. Using partial received transactions as the peer response");
}

if !transactions.is_empty() {
// TODO: does the caller need a list of missing transactions? (#1515)
Handler::Finished(Ok(Response::Transactions(transactions)))
if transactions.is_empty() {
// If we didn't get anything we wanted, retry the request.
let missing_transaction_ids = pending_ids.into_iter().map(Into::into).collect();
Handler::Finished(Err(PeerError::NotFound(missing_transaction_ids)))
} else {
// TODO: is it really an error if we ask for a transaction hash, but the peer
// doesn't know it? Should we close the connection on that kind of error? (#1515)
Handler::Finished(Err(PeerError::NotFound(missing_invs)))
// If we got some of what we wanted, let the internal client know.
let available = transactions.into_iter().map(ResponseStatus::Available);
let missing = pending_ids.into_iter().map(ResponseStatus::Missing);

Handler::Finished(Ok(Response::Transactions(
available.chain(missing).collect(),
)))
}
}
// `zcashd` returns requested blocks in a single batch of messages.
Expand All @@ -258,25 +265,19 @@ impl Handler {
if pending_hashes.remove(&block.hash()) {
// we are in the middle of the continuous block messages
blocks.push(block);
if pending_hashes.is_empty() {
Handler::Finished(Ok(Response::Blocks(blocks)))
} else {
Handler::BlocksByHash {
pending_hashes,
blocks,
}
}
} else {
// We got a block we didn't ask for.
//
// So either:
// 1. The response is for a previously cancelled block request.
// We should ignore that block, and wait for the actual response.
// We should treat that block as an inbound gossiped block,
// and wait for the actual response.
// 2. The peer doesn't know any of the blocks we asked for.
// We should cancel the request, so we don't hang waiting for blocks that
// will never arrive.
// 3. The peer sent an unsolicited block.
// We should ignore that block, and wait for the actual response.
// We should treat that block as an inbound gossiped block,
// and wait for the actual response.
//
// We ignore the message, so we don't desynchronize with the peer. This happens
// when we cancel a request and send a second different request, but receive a
Expand All @@ -286,15 +287,21 @@ impl Handler {
//
// Ignoring the message gives us a chance to synchronize back to the correct
// request.
//
// Peers can avoid these cascading errors by sending an explicit `notfound`.
// Zebra sends `notfound`, but `zcashd` doesn't.
ignored_msg = Some(Message::Block(block));
if !blocks.is_empty() {
// TODO: does the caller need a list of missing blocks? (#1515)
Handler::Finished(Ok(Response::Blocks(blocks)))
} else {
Handler::BlocksByHash {
pending_hashes,
blocks,
}
}

if pending_hashes.is_empty() {
// If we got everything we wanted, let the internal client know.
let available = blocks.into_iter().map(ResponseStatus::Available);
Handler::Finished(Ok(Response::Blocks(available.collect())))
} else {
// Keep on waiting for all the blocks we wanted, until we get them or time out.
Handler::BlocksByHash {
pending_hashes,
blocks,
}
}
}
Expand All @@ -304,7 +311,7 @@ impl Handler {
pending_hashes,
blocks,
},
Message::NotFound(items),
Message::NotFound(missing_invs),
) => {
// assumptions:
// - the peer eventually returns a block or a `NotFound` entry
Expand All @@ -315,36 +322,31 @@ impl Handler {
// If we're in sync with the peer, then the `NotFound` should contain the remaining
// hashes from the handler. If we're not in sync with the peer, we should return
// what we got so far, and log an error.
let missing_blocks: HashSet<_> = items
.iter()
.filter_map(|inv| match &inv {
InventoryHash::Block(b) => Some(b),
_ => None,
})
.cloned()
.collect();
let missing_blocks: HashSet<_> = block_hashes(&missing_invs).collect();
if missing_blocks != pending_hashes {
trace!(?items, ?missing_blocks, ?pending_hashes);
trace!(?missing_invs, ?missing_blocks, ?pending_hashes);
// if these errors are noisy, we should replace them with debugs
info!("unexpected notfound message from peer: all remaining block hashes should be listed in the notfound. Using partial received blocks as the peer response");
}
if missing_blocks.len() != items.len() {
trace!(?items, ?missing_blocks, ?pending_hashes);
if missing_blocks.len() != missing_invs.len() {
trace!(?missing_invs, ?missing_blocks, ?pending_hashes);
info!("unexpected notfound message from peer: notfound contains duplicate hashes or non-block hashes. Using partial received blocks as the peer response");
}

if !blocks.is_empty() {
// TODO: does the caller need a list of missing blocks? (#1515)
Handler::Finished(Ok(Response::Blocks(blocks)))
if blocks.is_empty() {
// If we didn't get anything we wanted, retry the request.
let missing_block_hashes = pending_hashes.into_iter().map(Into::into).collect();
Handler::Finished(Err(PeerError::NotFound(missing_block_hashes)))
} else {
// TODO: is it really an error if we ask for a block hash, but the peer
// doesn't know it? Should we close the connection on that kind of error? (#1515)
Handler::Finished(Err(PeerError::NotFound(items)))
// If we got some of what we wanted, let the internal client know.
let available = blocks.into_iter().map(ResponseStatus::Available);
let missing = pending_hashes.into_iter().map(ResponseStatus::Missing);

Handler::Finished(Ok(Response::Blocks(available.chain(missing).collect())))
}
}

// TODO:
// - add NotFound cases for blocks, transactions, and headers (#2726)
// - use `any(inv)` rather than `all(inv)`?
(Handler::FindBlocks, Message::Inv(items))
if items
Expand Down Expand Up @@ -1216,18 +1218,48 @@ where
}
}
Response::Transactions(transactions) => {
// Generate one tx message per transaction.
// Generate one tx message per transaction,
// then a notfound message with all the missing transaction ids.
let mut missing_ids = Vec::new();

for transaction in transactions.into_iter() {
if let Err(e) = self.peer_tx.send(Message::Tx(transaction)).await {
match transaction {
Available(transaction) => {
if let Err(e) = self.peer_tx.send(Message::Tx(transaction)).await {
self.fail_with(e);
return;
}
}
Missing(id) => missing_ids.push(id.into()),
}
}

if !missing_ids.is_empty() {
if let Err(e) = self.peer_tx.send(Message::NotFound(missing_ids)).await {
self.fail_with(e);
return;
}
}
}
Response::Blocks(blocks) => {
// Generate one block message per block.
// Generate one tx message per block,
// then a notfound% message with all the missing block hashes.
let mut missing_hashes = Vec::new();

for block in blocks.into_iter() {
if let Err(e) = self.peer_tx.send(Message::Block(block)).await {
match block {
Available(block) => {
if let Err(e) = self.peer_tx.send(Message::Block(block)).await {
self.fail_with(e);
return;
}
}
Missing(hash) => missing_hashes.push(hash.into()),
}
}

if !missing_hashes.is_empty() {
if let Err(e) = self.peer_tx.send(Message::NotFound(missing_hashes)).await {
self.fail_with(e);
return;
}
Expand Down
5 changes: 4 additions & 1 deletion zebra-network/src/peer/connection/tests/prop.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,9 +20,12 @@ use zebra_test::mock_service::{MockService, PropTestAssertion};
use crate::{
peer::{connection::Connection, ClientRequest, ErrorSlot},
protocol::external::Message,
protocol::internal::ResponseStatus,
Request, Response, SharedPeerError,
};

use ResponseStatus::*;

proptest! {
// The default value of proptest cases (256) causes this test to take more than ten seconds on
// most machines, so this reduces the value a little to reduce the test time.
Expand Down Expand Up @@ -105,7 +108,7 @@ proptest! {
);
let response = response_result.unwrap();

prop_assert_eq!(response, Response::Blocks(vec![second_block]));
prop_assert_eq!(response, Response::Blocks(vec![Available(second_block.0)]));

// Check the state after the response
let error = shared_error_slot.try_get_error();
Expand Down
Loading

0 comments on commit 0efc271

Please sign in to comment.