Skip to content

Commit

Permalink
feat(network): register missing peer responses as missing inventory
Browse files Browse the repository at this point in the history
We register this missing inventory based on peer responses,
or connection errors or timeouts.

Inbound message inventory tracking requires peers to send `notfound` messages.
But `zcashd` skips `notfound` for blocks, so we can't rely on peer messages.
This missing inventory tracking works regardless of peer `notfound` messages.
  • Loading branch information
teor2345 committed Feb 11, 2022
1 parent 4299b39 commit c438be0
Show file tree
Hide file tree
Showing 4 changed files with 155 additions and 15 deletions.
138 changes: 124 additions & 14 deletions zebra-network/src/peer/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -133,6 +133,7 @@ pub(super) struct InProgressClientRequest {
}

/// A `oneshot::Sender` for client responses, that must be used by calling `send()`.
/// Also handles forwarding missing inventory to the inventory registry.
///
/// Panics on drop if `tx` has not been used or canceled.
/// Panics if `tx.send()` is used more than once.
Expand All @@ -144,13 +145,26 @@ pub(super) struct MustUseClientResponseSender {
/// `None` if `tx.send()` has been used.
pub tx: Option<oneshot::Sender<Result<Response, SharedPeerError>>>,

/// Used to register missing inventory in responses on `tx`,
/// Forwards missing inventory in the response to the inventory collector.
///
/// Boxed to reduce the size of containing structures.
pub missing_inv: Option<Box<MissingInventoryCollector>>,
}

/// Forwards missing inventory in the response to the inventory registry.
#[derive(Debug)]
pub(super) struct MissingInventoryCollector {
/// A clone of the original request, if it is an inventory request.
///
/// This struct is only ever created with inventory requests.
request: Request,

/// Used to register missing inventory from responses,
/// so that the peer set can route retries to other clients.
#[allow(dead_code)]
pub inv_collector: Option<broadcast::Sender<InventoryChange>>,
collector: broadcast::Sender<InventoryChange>,

/// The peer address for registering missing inventory.
pub(crate) transient_addr: Option<SocketAddr>,
transient_addr: SocketAddr,
}

impl std::fmt::Debug for Client {
Expand All @@ -171,11 +185,10 @@ impl From<ClientRequest> for InProgressClientRequest {
transient_addr,
span,
} = client_request;
InProgressClientRequest {
request,
tx: MustUseClientResponseSender::new(tx, inv_collector, transient_addr),
span,
}

let tx = MustUseClientResponseSender::new(tx, &request, inv_collector, transient_addr);

InProgressClientRequest { request, tx, span }
}
}

Expand Down Expand Up @@ -237,31 +250,41 @@ impl From<mpsc::Receiver<ClientRequest>> for ClientRequestReceiver {
}

impl MustUseClientResponseSender {
/// Returns a newly created client response sender.
/// Returns a newly created client response sender for `tx`.
///
/// If `request` or the response contains missing inventory,
/// it is forwarded to the `inv_collector`, for the peer at `transient_addr`.
pub fn new(
tx: oneshot::Sender<Result<Response, SharedPeerError>>,
request: &Request,
inv_collector: Option<broadcast::Sender<InventoryChange>>,
transient_addr: Option<SocketAddr>,
) -> Self {
Self {
tx: Some(tx),
inv_collector,
transient_addr,
missing_inv: MissingInventoryCollector::new(request, inv_collector, transient_addr),
}
}

/// Forwards `response` to `tx.send()`, and marks this sender as used.
/// Forwards `response` to `tx.send()`, and missing inventory to `inv_collector`,
/// and marks this sender as used.
///
/// Panics if `tx.send()` is used more than once.
pub fn send(
mut self,
response: Result<Response, SharedPeerError>,
) -> Result<(), Result<Response, SharedPeerError>> {
// Forward any missing inventory to the registry.
if let Some(missing_inv) = self.missing_inv.take() {
missing_inv.send(&response);
}

// Forward the response to the internal requester.
self.tx
.take()
.unwrap_or_else(|| {
panic!(
"multiple uses of oneshot sender: oneshot must be used exactly once: {:?}",
"multiple uses of response sender: response must be sent exactly once: {:?}",
self
)
})
Expand Down Expand Up @@ -307,6 +330,93 @@ impl Drop for MustUseClientResponseSender {
}
}

impl MissingInventoryCollector {
/// Returns a newly created missing inventory collector, if needed.
///
/// If `request` or the response contains missing inventory,
/// it is forwarded to the `inv_collector`, for the peer at `transient_addr`.
pub fn new(
request: &Request,
inv_collector: Option<broadcast::Sender<InventoryChange>>,
transient_addr: Option<SocketAddr>,
) -> Option<Box<MissingInventoryCollector>> {
if !request.is_inventory_download() {
return None;
}

if let (Some(inv_collector), Some(transient_addr)) = (inv_collector, transient_addr) {
Some(Box::new(MissingInventoryCollector {
request: request.clone(),
collector: inv_collector,
transient_addr,
}))
} else {
None
}
}

/// Forwards any missing inventory to the registry.
///
/// `zcashd` doesn't send `notfound` messages for blocks,
/// so we need to track missing blocks ourselves.
///
/// This can sometimes send duplicate missing inventory,
/// but the registry ignores duplicates anyway.
pub fn send(self, response: &Result<Response, SharedPeerError>) {
let missing_inv: HashSet<InventoryHash> = match (self.request, response) {
// Missing block hashes from partial responses.
(_, Ok(Response::Blocks(block_statuses))) => block_statuses
.iter()
.filter_map(|b| b.missing())
.map(InventoryHash::Block)
.collect(),

// Missing transaction IDs from partial responses.
(_, Ok(Response::Transactions(tx_statuses))) => tx_statuses
.iter()
.filter_map(|tx| tx.missing())
.map(|tx| tx.into())
.collect(),

// Other response types never contain missing inventory.
(_, Ok(_)) => iter::empty().collect(),

// We don't forward NotFoundRegistry errors,
// because the errors are generated locally from the registry,
// so those statuses are already in the registry.
//
// Unfortunately, we can't access the inner error variant here,
// due to TracedError.
(_, Err(e)) if e.inner_debug().contains("NotFoundRegistry") => iter::empty().collect(),

// Missing inventory from other errors, including NotFoundResponse, timeouts,
// and dropped connections.
(request, Err(_)) => {
// The request either contains blocks or transactions,
// but this is a convenient way to collect them both.
let missing_blocks = request
.block_hash_inventory()
.into_iter()
.map(InventoryHash::Block);

let missing_txs = request
.transaction_id_inventory()
.into_iter()
.map(InventoryHash::from);

missing_blocks.chain(missing_txs).collect()
}
};

if let Some(missing_inv) =
InventoryChange::new_missing_multi(missing_inv.iter(), self.transient_addr)
{
// if all the receivers are closed, assume we're in tests or an isolated connection
let _ = self.collector.send(missing_inv);
}
}
}

impl Client {
/// Check if this connection's heartbeat task has exited.
fn check_heartbeat(&mut self, cx: &mut Context<'_>) -> Result<(), SharedPeerError> {
Expand Down
1 change: 0 additions & 1 deletion zebra-network/src/peer_set/inventory_registry.rs
Original file line number Diff line number Diff line change
Expand Up @@ -106,7 +106,6 @@ impl InventoryChange {
}

/// Returns a new missing multiple inventory change, if `hashes` contains at least one change.
#[allow(dead_code)]
pub fn new_missing_multi<'a>(
hashes: impl IntoIterator<Item = &'a InventoryHash>,
peer: SocketAddr,
Expand Down
26 changes: 26 additions & 0 deletions zebra-network/src/protocol/internal/request.rs
Original file line number Diff line number Diff line change
Expand Up @@ -249,4 +249,30 @@ impl Request {
Request::MempoolTransactionIds => "MempoolTransactionIds",
}
}

/// Returns true if the request is for block or transaction inventory downloads.
pub fn is_inventory_download(&self) -> bool {
matches!(
self,
Request::BlocksByHash(_) | Request::TransactionsById(_)
)
}

/// Returns the block hash inventory downloads from the request, if any.
pub fn block_hash_inventory(&self) -> HashSet<block::Hash> {
if let Request::BlocksByHash(block_hashes) = self {
block_hashes.clone()
} else {
HashSet::new()
}
}

/// Returns the transaction ID inventory downloads from the request, if any.
pub fn transaction_id_inventory(&self) -> HashSet<UnminedTxId> {
if let Request::TransactionsById(transaction_ids) = self {
transaction_ids.clone()
} else {
HashSet::new()
}
}
}
5 changes: 5 additions & 0 deletions zebra-network/src/protocol/internal/response.rs
Original file line number Diff line number Diff line change
Expand Up @@ -136,4 +136,9 @@ impl Response {
Response::Transactions(_) => "Transactions",
}
}

/// Returns true if the response is a block or transaction inventory download.
pub fn is_inventory_download(&self) -> bool {
matches!(self, Response::Blocks(_) | Response::Transactions(_))
}
}

0 comments on commit c438be0

Please sign in to comment.