Skip to content

Commit

Permalink
4. Avoid repeated requests to peers after partial responses or errors (
Browse files Browse the repository at this point in the history
…#3505)

* fix(network): split synthetic NotFoundRegistry from message NotFoundResponse

* docs(network): Improve `notfound` message documentation

* refactor(network): Rename MustUseOneshotSender to MustUseClientResponseSender

```
fastmod MustUseOneshotSender MustUseClientResponseSender zebra*
```

* docs(network): fix a comment typo

* refactor(network): remove generics from MustUseClientResponseSender

* refactor(network): add an inventory collector to Client, but don't use it yet

* feat(network): register missing peer responses as missing inventory

We register this missing inventory based on peer responses,
or connection errors or timeouts.

Inbound message inventory tracking requires peers to send `notfound` messages.
But `zcashd` skips `notfound` for blocks, so we can't rely on peer messages.
This missing inventory tracking works regardless of peer `notfound` messages.

* refactor(network): rename ResponseStatus to InventoryResponse

```sh
fastmod ResponseStatus InventoryResponse zebra*
```

* refactor(network): rename InventoryStatus::inner() to to_inner()

* fix(network): remove a redundant runtime.enter() in a test

* doc(network): the exact time used to filter outbound peers doesn't matter

* fix(network): handle block requests slightly more efficiently

* doc(network): fix a typo

* fmt(network): `cargo fmt` after rename ResponseStatus to InventoryResponse

* doc(test): clarify some test comments

* test(network): test synthetic notfound from connection errors and peer inventory routing

* test(network): improve inbound test diagnostics

* feat(network): add a proptest-impl feature to zebra-network

* feat(network): add a test-only connect_isolated_with_inbound function

* test(network): allow a response on the isolated peer test connection

* test(network): fix failures in test synthetic notfound

* test(network): Simplify SharedPeerError test assertions

* test(network): test synthetic notfound from partially successful requests

* test(network): MissingInventoryCollector ignores local NotFoundRegistry errors

* fix(network): decrease the inventory rotation interval

This stops us waiting 3-4 sync resets (4 minutes) before we retry a missing block.

Now we wait 1-2 sync resets (2 minutes), which is still a reasonable rate limit.
This should speed up syncing near the tip, and on testnet.

* fmt(network): cargo fmt --all

* cleanup(network): remove unnecessary allow(dead_code)

* cleanup(network): stop importing the whole sync module into tests

* doc(network): clarify syncer inventory retry constraint

* doc(network): add a TODO for a fix to ensure API behaviour remains consistent

* doc(network): fix a function doc typo

* doc(network): clarify how we handle peers that don't send `notfound`

* docs(network): clarify a test comment

Co-authored-by: Janito Vaqueiro Ferreira Filho <janito.vff@gmail.com>

Co-authored-by: Janito Vaqueiro Ferreira Filho <janito.vff@gmail.com>
Co-authored-by: mergify[bot] <37929162+mergify[bot]@users.noreply.github.com>
  • Loading branch information
3 people authored Feb 15, 2022
1 parent b4d7080 commit a4dd3b7
Show file tree
Hide file tree
Showing 37 changed files with 1,022 additions and 236 deletions.
6 changes: 3 additions & 3 deletions zebra-chain/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@ zcash_note_encryption = { git = "https://github.com/ZcashFoundation/librustzcash
zcash_primitives = { git = "https://github.com/ZcashFoundation/librustzcash.git", tag = "0.5.1-zebra-v1.0.0-beta.4" }
zcash_history = { git = "https://github.com/ZcashFoundation/librustzcash.git", tag = "0.5.1-zebra-v1.0.0-beta.4" }

proptest = { version = "0.10", optional = true }
proptest = { version = "0.10.1", optional = true }
proptest-derive = { version = "0.3.0", optional = true }

rand = { version = "0.8", optional = true }
Expand All @@ -76,8 +76,8 @@ itertools = "0.10.3"
spandoc = "0.2"
tracing = "0.1.29"

proptest = "0.10"
proptest-derive = "0.3"
proptest = "0.10.1"
proptest-derive = "0.3.0"
rand = "0.8"
rand_chacha = "0.3"

Expand Down
4 changes: 2 additions & 2 deletions zebra-consensus/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -47,13 +47,13 @@ zebra-chain = { path = "../zebra-chain" }
zebra-state = { path = "../zebra-state" }
zebra-script = { path = "../zebra-script" }

proptest = { version = "0.10", optional = true }
proptest = { version = "0.10.1", optional = true }
proptest-derive = { version = "0.3.0", optional = true }

[dev-dependencies]
color-eyre = "0.5.11"
hex = "0.4.3"
proptest = "0.10"
proptest = "0.10.1"
proptest-derive = "0.3.0"
rand07 = { package = "rand", version = "0.7" }
spandoc = "0.2"
Expand Down
10 changes: 8 additions & 2 deletions zebra-network/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ edition = "2021"
[features]
default = []
tor = ["arti-client", "tor-rtcompat"]
proptest-impl = ["proptest", "proptest-derive", "zebra-chain/proptest-impl"]

[dependencies]
bitflags = "1.2"
Expand Down Expand Up @@ -40,11 +41,16 @@ tracing-error = { version = "0.1.2", features = ["traced-error"] }
arti-client = { version = "0.0.2", optional = true }
tor-rtcompat = { version = "0.0.2", optional = true }

# proptest dependencies
proptest = { version = "0.10.1", optional = true }
proptest-derive = { version = "0.3.0", optional = true }

zebra-chain = { path = "../zebra-chain" }

[dev-dependencies]
proptest = "0.10"
proptest-derive = "0.3"
proptest = "0.10.1"
proptest-derive = "0.3.0"

static_assertions = "1.1.0"
tokio = { version = "1.16.1", features = ["test-util"] }
toml = "0.5"
Expand Down
23 changes: 23 additions & 0 deletions zebra-network/src/constants.rs
Original file line number Diff line number Diff line change
Expand Up @@ -95,6 +95,11 @@ pub const HANDSHAKE_TIMEOUT: Duration = Duration::from_secs(4);
/// specific manner that matches up with this math.
pub const MIN_PEER_RECONNECTION_DELAY: Duration = Duration::from_secs(59 + 20 + 20 + 20);

/// Zebra rotates its peer inventory registry every time this interval elapses.
///
/// After 2 of these intervals, Zebra's local available and missing inventory entries expire.
pub const INVENTORY_ROTATION_INTERVAL: Duration = Duration::from_secs(53);

/// The default peer address crawler interval.
///
/// This should be at least [`HANDSHAKE_TIMEOUT`](constants::HANDSHAKE_TIMEOUT)
Expand Down Expand Up @@ -309,6 +314,8 @@ mod tests {

use std::convert::TryFrom;

use zebra_chain::parameters::POST_BLOSSOM_POW_TARGET_SPACING;

use super::*;

/// This assures that the `Duration` value we are computing for
Expand Down Expand Up @@ -394,4 +401,20 @@ mod tests {
"the address book limit should actually be used"
);
}

/// Make sure inventory registry rotation is consistent with the target block interval.
#[test]
fn ensure_inventory_rotation_consistent() {
zebra_test::init();

assert!(
INVENTORY_ROTATION_INTERVAL
< Duration::from_secs(
POST_BLOSSOM_POW_TARGET_SPACING
.try_into()
.expect("non-negative"),
),
"we should expire inventory every time 1-2 new blocks get generated"
);
}
}
71 changes: 64 additions & 7 deletions zebra-network/src/isolated.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ use futures::future::TryFutureExt;
use tokio::io::{AsyncRead, AsyncWrite};
use tower::{
util::{BoxService, Oneshot},
ServiceExt,
Service, ServiceExt,
};

use zebra_chain::{chain_tip::NoChainTip, parameters::Network};
Expand All @@ -32,11 +32,11 @@ mod tests;
/// this low-level API is useful for custom network crawlers or Tor connections.
///
/// In addition to being completely isolated from all other node state, this
/// method also aims to be minimally distinguishable from other clients.
/// function also aims to be minimally distinguishable from other clients.
///
/// SECURITY TODO: check if the timestamp field can be zeroed, to remove another distinguisher (#3300)
///
/// Note that this method does not implement any timeout behavior, so callers may
/// Note that this function does not implement any timeout behavior, so callers may
/// want to layer it with a timeout as appropriate for their application.
///
/// # Inputs
Expand All @@ -54,6 +54,37 @@ pub fn connect_isolated<PeerTransport>(
) -> impl Future<Output = Result<BoxService<Request, Response, BoxError>, BoxError>>
where
PeerTransport: AsyncRead + AsyncWrite + Unpin + Send + 'static,
{
let nil_inbound_service =
tower::service_fn(|_req| async move { Ok::<Response, BoxError>(Response::Nil) });

connect_isolated_with_inbound(network, data_stream, user_agent, nil_inbound_service)
}

/// Creates an isolated Zcash peer connection using the provided data stream.
/// This function is for testing purposes only.
///
/// See [`connect_isolated`] for details.
///
/// # Additional Inputs
///
/// - `inbound_service`: a [`tower::Service`] that answers inbound requests from the connected peer.
///
/// # Privacy
///
/// This function can make the isolated connection send different responses to peers,
/// which makes it stand out from other isolated connections from other peers.
pub fn connect_isolated_with_inbound<PeerTransport, InboundService>(
network: Network,
data_stream: PeerTransport,
user_agent: String,
inbound_service: InboundService,
) -> impl Future<Output = Result<BoxService<Request, Response, BoxError>, BoxError>>
where
PeerTransport: AsyncRead + AsyncWrite + Unpin + Send + 'static,
InboundService:
Service<Request, Response = Response, Error = BoxError> + Clone + Send + 'static,
InboundService::Future: Send,
{
let config = Config {
network,
Expand All @@ -62,9 +93,7 @@ where

let handshake = peer::Handshake::builder()
.with_config(config)
.with_inbound_service(tower::service_fn(|_req| async move {
Ok::<Response, BoxError>(Response::Nil)
}))
.with_inbound_service(inbound_service)
.with_user_agent(user_agent)
.with_latest_chain_tip(NoChainTip)
.finish()
Expand Down Expand Up @@ -101,7 +130,35 @@ pub fn connect_isolated_tcp_direct(
addr: SocketAddr,
user_agent: String,
) -> impl Future<Output = Result<BoxService<Request, Response, BoxError>, BoxError>> {
let nil_inbound_service =
tower::service_fn(|_req| async move { Ok::<Response, BoxError>(Response::Nil) });

connect_isolated_tcp_direct_with_inbound(network, addr, user_agent, nil_inbound_service)
}

/// Creates an isolated Zcash peer connection using the provided data stream.
/// This function is for testing purposes only.
///
/// See [`connect_isolated_with_inbound`] and [`connect_isolated_tcp_direct`] for details.
///
/// # Privacy
///
/// This function can make the isolated connection send different responses to peers,
/// which makes it stand out from other isolated connections from other peers.
pub fn connect_isolated_tcp_direct_with_inbound<InboundService>(
network: Network,
addr: SocketAddr,
user_agent: String,
inbound_service: InboundService,
) -> impl Future<Output = Result<BoxService<Request, Response, BoxError>, BoxError>>
where
InboundService:
Service<Request, Response = Response, Error = BoxError> + Clone + Send + 'static,
InboundService::Future: Send,
{
tokio::net::TcpStream::connect(addr)
.err_into()
.and_then(move |tcp_stream| connect_isolated(network, tcp_stream, user_agent))
.and_then(move |tcp_stream| {
connect_isolated_with_inbound(network, tcp_stream, user_agent, inbound_service)
})
}
46 changes: 42 additions & 4 deletions zebra-network/src/isolated/tor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,13 +2,13 @@
use std::sync::{Arc, Mutex};

use arti_client::{TorAddr, TorClient, TorClientConfig};
use arti_client::{DataStream, TorAddr, TorClient, TorClientConfig};
use tor_rtcompat::tokio::TokioRuntimeHandle;
use tower::util::BoxService;
use tower::{util::BoxService, Service};

use zebra_chain::parameters::Network;

use crate::{connect_isolated, BoxError, Request, Response};
use crate::{connect_isolated, connect_isolated_with_inbound, BoxError, Request, Response};

#[cfg(test)]
mod tests;
Expand Down Expand Up @@ -45,6 +45,44 @@ pub async fn connect_isolated_tor(
hostname: String,
user_agent: String,
) -> Result<BoxService<Request, Response, BoxError>, BoxError> {
let tor_stream = new_tor_stream(hostname).await?;

// Calling connect_isolated_tor_with_inbound causes lifetime issues.
//
// TODO: fix the lifetime issues, and call connect_isolated_tor_with_inbound
// so the behaviour of both functions is consistent.
connect_isolated(network, tor_stream, user_agent).await
}

/// Creates an isolated Zcash peer connection to `hostname` via Tor.
/// This function is for testing purposes only.
///
/// See [`connect_isolated_with_inbound`] and [`connect_isolated_tor`] for details.
///
/// # Privacy
///
/// This function can make the isolated connection send different responses to peers,
/// which makes it stand out from other isolated connections from other peers.
pub async fn connect_isolated_tor_with_inbound<InboundService>(
network: Network,
hostname: String,
user_agent: String,
inbound_service: InboundService,
) -> Result<BoxService<Request, Response, BoxError>, BoxError>
where
InboundService:
Service<Request, Response = Response, Error = BoxError> + Clone + Send + 'static,
InboundService::Future: Send,
{
let tor_stream = new_tor_stream(hostname).await?;

connect_isolated_with_inbound(network, tor_stream, user_agent, inbound_service).await
}

/// Creates a Zcash peer connection to `hostname` via Tor, and returns a tor stream.
///
/// See [`connect_isolated`] for details.
async fn new_tor_stream(hostname: String) -> Result<DataStream, BoxError> {
let addr = TorAddr::from(hostname)?;

// Initialize or clone the shared tor client instance
Expand All @@ -55,7 +93,7 @@ pub async fn connect_isolated_tor(

let tor_stream = tor_client.connect(addr, None).await?;

connect_isolated(network, tor_stream, user_agent).await
Ok(tor_stream)
}

/// Returns a new tor client instance, and updates [`SHARED_TOR_CLIENT`].
Expand Down
13 changes: 12 additions & 1 deletion zebra-network/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -150,6 +150,14 @@ mod protocol;
#[cfg(feature = "tor")]
pub use crate::isolated::tor::connect_isolated_tor;

#[cfg(all(feature = "tor", any(test, feature = "proptest-impl")))]
pub use crate::isolated::tor::connect_isolated_tor_with_inbound;

#[cfg(any(test, feature = "proptest-impl"))]
pub use crate::isolated::{
connect_isolated_tcp_direct_with_inbound, connect_isolated_with_inbound,
};

pub use crate::{
address_book::AddressBook,
config::Config,
Expand All @@ -158,10 +166,13 @@ pub use crate::{
peer::{HandshakeError, PeerError, SharedPeerError},
peer_set::init,
policies::RetryLimit,
protocol::internal::{Request, Response, ResponseStatus},
protocol::internal::{InventoryResponse, Request, Response},
};

/// Types used in the definition of [`Request`] and [`Response`] messages.
pub mod types {
pub use crate::{meta_addr::MetaAddr, protocol::types::PeerServices};

#[cfg(any(test, feature = "proptest-impl"))]
pub use crate::protocol::external::InventoryHash;
}
2 changes: 2 additions & 0 deletions zebra-network/src/meta_addr/arbitrary.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,12 +12,14 @@ use super::{MetaAddr, MetaAddrChange, PeerServices};
///
/// This should be at least twice the number of [`PeerAddrState`]s, so the tests
/// can cover multiple transitions through every state.
#[allow(dead_code)]
pub const MAX_ADDR_CHANGE: usize = 15;

/// The largest number of random addresses we want to add to an [`AddressBook`].
///
/// This should be at least the number of [`PeerAddrState`]s, so the tests can
/// cover interactions between addresses in different states.
#[allow(dead_code)]
pub const MAX_META_ADDR: usize = 8;

impl MetaAddr {
Expand Down
2 changes: 1 addition & 1 deletion zebra-network/src/peer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ pub(crate) use client::tests::ReceiveRequestAttempt;
#[cfg(test)]
pub(crate) use handshake::register_inventory_status;

use client::{ClientRequestReceiver, InProgressClientRequest, MustUseOneshotSender};
use client::{ClientRequestReceiver, InProgressClientRequest, MustUseClientResponseSender};

pub(crate) use client::{CancelHeartbeatTask, ClientRequest};

Expand Down
Loading

0 comments on commit a4dd3b7

Please sign in to comment.