Skip to content

Commit

Permalink
Disconnect from outdated peers on network upgrade (#3108)
Browse files Browse the repository at this point in the history
* Replace usage of `discover::Change` with a tuple

Remove the assumption that a `Remove` variant would never be created
with type changes that allow the compiler to guarantee that assumption.

* Add a `version` field to the `Client` type

Keep track of the peer's reported protocol version.

* Create `LoadTrackedClient` type

A `peer::Client` type wrapper that implements `Load`. This helps with
the creation of a client service that has extra peer information to be
accessed without having to send requests.

* Use `LoadTrackedClient` in `initialize`

Ensure that `PeerSet` receives `LoadTrackedClient`s so that it will be
able to query the peer's protocol version later on.

* Require `LoadTrackedClient` in `PeerSet`

Replace the generic type with a concrete `LoadTrackedClient` so that we
can query its version.

* Create `MinimumPeerVersion` helper type

A type to track the current minimum protocol version for connected
peers based on the current block height.

* Use `MinimumPeerVersion` in handshakes

Keep the code to obtain the current minimum peer protocol version in a
central place.

* Add a `MinimumPeerVersion` instance to `PeerSet`

Prepare it to be able to disconnect from outdated peers based on the
current minimum supported peer protocol version.

* Disconnect from ready services for outdated peers

When the minimum peer protocol version is detected to have changed
(because of a network upgrade), remove all ready services of peers that
became outdated.

* Cancel added unready services of outdated peers

Only add an unready service if it's for a peer that has a supported
protocol version. Otherwise, add it but drop the cancel handle so that
the `UnreadyService` can execute and detect that it was cancelled.

* Avoid adding ready services for outdated peers

If a service becomes ready but it's for a connection to an outdated
peer, drop it.

* Improve comment inside `crawl_and_dial`

Describe an edge case that is also handled but was not explicit.

Co-authored-by: teor <teor@riseup.net>

* Test if calculated minimum peer version is correct

Given an arbitrary best chain tip height, check that the calculated
minimum peer protocol version is the expected value.

* Test if minimum version changes with chain tip

Apply an arbitrary list of chain tip height updates and check that for
each update the minimum peer version is calculated correctly.

* Test minimum peer version changed reports

Simulate a series of best chain tip height updates, and check for
minimum peer version updates at least once between them. Changes should
only be reported once.

* Create a `MockedClientHandle` helper type

Used to create and then track a mock `Client` instance.

* Add `MinimumPeerVersion::with_mock_chain_tip`

An extension method useful for tests, that contains some shared
boilerplate code.

* Bias arbitrary `Version`s to be in valid range

Give a 50% chance for an arbitrary `Version` to be in the range of
previously used values the Zcash network.

* Create a `PeerVersions` helper type

Helps with the creation of mocked client services with arbitrary
protocol versions.

* Create a `PeerSetGuard` helper type

An auxiliary type to a `PeerSet` instance created for testing. It keeps
track of any dummy endpoints of channels created and passed to the
`PeerSet` instance.

* Create a `PeerSetBuilder` helper type

Helps to reduce the code when preparing a `PeerSet` test instance.

* Test if outdated peers are rejected by `PeerSet`

Simulate a set of discovered peers being sent to the `PeerSet`. Ensure
that only up-to-date peers are kept by the `PeerSet` and that outdated
peers are dropped.

* Create `BlockHeightPairAcrossNetworkUpgrades` type

A helper type that allows the creation of arbitrary block height pairs,
where one value is before and the other is at or after the activation
height of an arbitrary network upgrade.

* Test if peers are dropped as they become outdated

Simulate a network upgrade, and check that peers that become outdated
are dropped by the `PeerSet`.

* Remove dbg! macros

Co-authored-by: teor <teor@riseup.net>
  • Loading branch information
jvff and teor2345 authored Dec 9, 2021
1 parent c55753d commit 0ad89f2
Show file tree
Hide file tree
Showing 16 changed files with 1,007 additions and 128 deletions.
10 changes: 8 additions & 2 deletions zebra-network/src/constants.rs
Original file line number Diff line number Diff line change
Expand Up @@ -200,7 +200,10 @@ pub const EWMA_DEFAULT_RTT: Duration = Duration::from_secs(REQUEST_TIMEOUT.as_se
///
/// This should be much larger than the `SYNC_RESTART_TIMEOUT`, so we choose
/// better peers when we restart the sync.
pub const EWMA_DECAY_TIME: Duration = Duration::from_secs(200);
pub const EWMA_DECAY_TIME_NANOS: f64 = 200.0 * NANOS_PER_SECOND;

/// The number of nanoseconds in one second.
const NANOS_PER_SECOND: f64 = 1_000_000_000.0;

lazy_static! {
/// The minimum network protocol version accepted by this crate for each network,
Expand Down Expand Up @@ -279,7 +282,10 @@ mod tests {
assert!(EWMA_DEFAULT_RTT > REQUEST_TIMEOUT,
"The default EWMA RTT should be higher than the request timeout, so new peers are required to prove they are fast, before we prefer them to other peers.");

assert!(EWMA_DECAY_TIME > REQUEST_TIMEOUT,
let request_timeout_nanos = REQUEST_TIMEOUT.as_secs_f64()
+ f64::from(REQUEST_TIMEOUT.subsec_nanos()) * NANOS_PER_SECOND;

assert!(EWMA_DECAY_TIME_NANOS > request_timeout_nanos,
"The EWMA decay time should be higher than the request timeout, so timed out peers are penalised by the EWMA.");
}

Expand Down
13 changes: 12 additions & 1 deletion zebra-network/src/peer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,11 +10,22 @@ mod connector;
mod error;
/// Performs peer handshakes.
mod handshake;
/// Tracks the load on a `Client` service.
mod load_tracked_client;
/// Watches for chain tip height updates to determine the minimum support peer protocol version.
mod minimum_peer_version;

use client::{ClientRequest, ClientRequestReceiver, InProgressClientRequest, MustUseOneshotSender};
#[cfg(not(test))]
use client::ClientRequest;
#[cfg(test)]
pub(crate) use client::ClientRequest;

use client::{ClientRequestReceiver, InProgressClientRequest, MustUseOneshotSender};

pub use client::Client;
pub use connection::Connection;
pub use connector::{Connector, OutboundConnectorRequest};
pub use error::{ErrorSlot, HandshakeError, PeerError, SharedPeerError};
pub use handshake::{ConnectedAddr, Handshake, HandshakeRequest};
pub use load_tracked_client::LoadTrackedClient;
pub use minimum_peer_version::MinimumPeerVersion;
8 changes: 7 additions & 1 deletion zebra-network/src/peer/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,10 @@ use futures::{
};
use tower::Service;

use crate::protocol::internal::{Request, Response};
use crate::protocol::{
external::types::Version,
internal::{Request, Response},
};

use super::{ErrorSlot, PeerError, SharedPeerError};

Expand All @@ -28,6 +31,9 @@ pub struct Client {
///
/// `None` unless the connection or client have errored.
pub(crate) error_slot: ErrorSlot,

/// The peer connection's protocol version.
pub(crate) version: Version,
}

/// A message from the `peer::Client` to the `peer::Server`.
Expand Down
6 changes: 3 additions & 3 deletions zebra-network/src/peer/connector.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ use std::{

use futures::prelude::*;
use tokio::net::TcpStream;
use tower::{discover::Change, Service, ServiceExt};
use tower::{Service, ServiceExt};
use tracing_futures::Instrument;

use zebra_chain::chain_tip::{ChainTip, NoChainTip};
Expand Down Expand Up @@ -57,7 +57,7 @@ where
S::Future: Send,
C: ChainTip + Clone + Send + 'static,
{
type Response = Change<SocketAddr, Client>;
type Response = (SocketAddr, Client);
type Error = BoxError;
type Future =
Pin<Box<dyn Future<Output = Result<Self::Response, Self::Error>> + Send + 'static>>;
Expand Down Expand Up @@ -86,7 +86,7 @@ where
connection_tracker,
})
.await?;
Ok(Change::Insert(addr, client))
Ok((addr, client))
}
.instrument(connector_span)
.boxed()
Expand Down
20 changes: 12 additions & 8 deletions zebra-network/src/peer/handshake.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,9 @@ use zebra_chain::{
use crate::{
constants,
meta_addr::MetaAddrChange,
peer::{Client, ClientRequest, Connection, ErrorSlot, HandshakeError, PeerError},
peer::{
Client, ClientRequest, Connection, ErrorSlot, HandshakeError, MinimumPeerVersion, PeerError,
},
peer_set::ConnectionTracker,
protocol::{
external::{types::*, AddrInVersion, Codec, InventoryHash, Message},
Expand Down Expand Up @@ -59,7 +61,7 @@ pub struct Handshake<S, C = NoChainTip> {
our_services: PeerServices,
relay: bool,
parent_span: Span,
latest_chain_tip: C,
minimum_peer_version: MinimumPeerVersion<C>,
}

/// The peer address that we are handshaking with.
Expand Down Expand Up @@ -420,6 +422,8 @@ where
let user_agent = self.user_agent.unwrap_or_else(|| "".to_string());
let our_services = self.our_services.unwrap_or_else(PeerServices::empty);
let relay = self.relay.unwrap_or(false);
let network = config.network;
let minimum_peer_version = MinimumPeerVersion::new(self.latest_chain_tip, network);

Ok(Handshake {
config,
Expand All @@ -431,7 +435,7 @@ where
our_services,
relay,
parent_span: Span::current(),
latest_chain_tip: self.latest_chain_tip,
minimum_peer_version,
})
}
}
Expand Down Expand Up @@ -473,7 +477,7 @@ pub async fn negotiate_version(
user_agent: String,
our_services: PeerServices,
relay: bool,
latest_chain_tip: impl ChainTip,
mut minimum_peer_version: MinimumPeerVersion<impl ChainTip>,
) -> Result<(Version, PeerServices, SocketAddr), HandshakeError> {
// Create a random nonce for this connection
let local_nonce = Nonce::default();
Expand Down Expand Up @@ -589,8 +593,7 @@ pub async fn negotiate_version(

// SECURITY: Reject connections to peers on old versions, because they might not know about all
// network upgrades and could lead to chain forks or slower block propagation.
let height = latest_chain_tip.best_tip_height();
let min_version = Version::min_remote_for_height(config.network, height);
let min_version = minimum_peer_version.current();
if remote_version < min_version {
debug!(
remote_ip = ?their_addr,
Expand Down Expand Up @@ -716,7 +719,7 @@ where
let user_agent = self.user_agent.clone();
let our_services = self.our_services;
let relay = self.relay;
let latest_chain_tip = self.latest_chain_tip.clone();
let minimum_peer_version = self.minimum_peer_version.clone();

let fut = async move {
debug!(
Expand Down Expand Up @@ -747,7 +750,7 @@ where
user_agent,
our_services,
relay,
latest_chain_tip,
minimum_peer_version,
),
)
.await??;
Expand Down Expand Up @@ -792,6 +795,7 @@ where
shutdown_tx: Some(shutdown_tx),
server_tx: server_tx.clone(),
error_slot: slot.clone(),
version: remote_version,
};

let (peer_tx, peer_rx) = peer_conn.split();
Expand Down
71 changes: 71 additions & 0 deletions zebra-network/src/peer/load_tracked_client.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,71 @@
//! A peer connection service wrapper type to handle load tracking and provide access to the
//! reported protocol version.

use std::task::{Context, Poll};

use tower::{
load::{Load, PeakEwma},
Service,
};

use crate::{
constants::{EWMA_DECAY_TIME_NANOS, EWMA_DEFAULT_RTT},
peer::Client,
protocol::external::types::Version,
};

/// A client service wrapper that keeps track of its load.
///
/// It also keeps track of the peer's reported protocol version.
pub struct LoadTrackedClient {
service: PeakEwma<Client>,
version: Version,
}

/// Create a new [`LoadTrackedClient`] wrapping the provided `client` service.
impl From<Client> for LoadTrackedClient {
fn from(client: Client) -> Self {
let version = client.version;

let service = PeakEwma::new(
client,
EWMA_DEFAULT_RTT,
EWMA_DECAY_TIME_NANOS,
tower::load::CompleteOnResponse::default(),
);

LoadTrackedClient { service, version }
}
}

impl LoadTrackedClient {
/// Retrieve the peer's reported protocol version.
pub fn version(&self) -> Version {
self.version
}
}

impl<Request> Service<Request> for LoadTrackedClient
where
Client: Service<Request>,
{
type Response = <Client as Service<Request>>::Response;
type Error = <Client as Service<Request>>::Error;
type Future = <PeakEwma<Client> as Service<Request>>::Future;

fn poll_ready(&mut self, context: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
self.service.poll_ready(context)
}

fn call(&mut self, request: Request) -> Self::Future {
self.service.call(request)
}
}

impl Load for LoadTrackedClient {
type Metric = <PeakEwma<Client> as Load>::Metric;

fn load(&self) -> Self::Metric {
self.service.load()
}
}
83 changes: 83 additions & 0 deletions zebra-network/src/peer/minimum_peer_version.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,83 @@
use zebra_chain::{chain_tip::ChainTip, parameters::Network};

use crate::protocol::external::types::Version;

#[cfg(any(test, feature = "proptest-impl"))]
mod tests;

/// A helper type to monitor the chain tip in order to determine the minimum peer protocol version
/// that is currently supported.
pub struct MinimumPeerVersion<C> {
network: Network,
chain_tip: C,
current_minimum: Version,
has_changed: bool,
}

impl<C> MinimumPeerVersion<C>
where
C: ChainTip,
{
/// Create a new [`MinimumPeerVersion`] to track the minimum supported peer protocol version
/// for the current `chain_tip` on the `network`.
pub fn new(chain_tip: C, network: Network) -> Self {
MinimumPeerVersion {
network,
chain_tip,
current_minimum: Version::min_remote_for_height(network, None),
has_changed: true,
}
}

/// Check if the minimum supported peer version has changed since the last time this was
/// called.
///
/// The first call returns the current minimum version, and subsequent calls return [`None`]
/// until the minimum version changes. When that happens, the next call returns the new minimum
/// version, and subsequent calls return [`None`] again until the minimum version changes once
/// more.
pub fn changed(&mut self) -> Option<Version> {
self.update();

if self.has_changed {
self.has_changed = false;
Some(self.current_minimum)
} else {
None
}
}

/// Retrieve the current minimum supported peer protocol version.
pub fn current(&mut self) -> Version {
self.update();
self.current_minimum
}

/// Check the current chain tip height to determine the minimum peer version, and detect if it
/// has changed.
fn update(&mut self) {
let height = self.chain_tip.best_tip_height();
let new_minimum = Version::min_remote_for_height(self.network, height);

if self.current_minimum != new_minimum {
self.current_minimum = new_minimum;
self.has_changed = true;
}
}
}

/// A custom [`Clone`] implementation to ensure that the first call to
/// [`MinimumPeerVersion::changed`] after the clone will always return the current version.
impl<C> Clone for MinimumPeerVersion<C>
where
C: Clone,
{
fn clone(&self) -> Self {
MinimumPeerVersion {
network: self.network,
chain_tip: self.chain_tip.clone(),
current_minimum: self.current_minimum,
has_changed: true,
}
}
}
57 changes: 57 additions & 0 deletions zebra-network/src/peer/minimum_peer_version/tests.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,57 @@
use std::sync::Arc;

use tokio::sync::watch;

use zebra_chain::{block, chain_tip::ChainTip, parameters::Network, transaction};

use super::MinimumPeerVersion;

#[cfg(test)]
mod prop;

/// A mock [`ChainTip`] implementation that allows setting the `best_tip_height` externally.
#[derive(Clone)]
pub struct MockChainTip {
best_tip_height: watch::Receiver<Option<block::Height>>,
}

impl MockChainTip {
/// Create a new [`MockChainTip`].
///
/// Returns the [`MockChainTip`] instance and the endpoint to modiy the current best tip
/// height.
///
/// Initially, the best tip height is [`None`].
pub fn new() -> (Self, watch::Sender<Option<block::Height>>) {
let (sender, receiver) = watch::channel(None);

let mock_chain_tip = MockChainTip {
best_tip_height: receiver,
};

(mock_chain_tip, sender)
}
}

impl ChainTip for MockChainTip {
fn best_tip_height(&self) -> Option<block::Height> {
*self.best_tip_height.borrow()
}

fn best_tip_hash(&self) -> Option<block::Hash> {
unreachable!("Method not used in `MinimumPeerVersion` tests");
}

fn best_tip_mined_transaction_ids(&self) -> Arc<[transaction::Hash]> {
unreachable!("Method not used in `MinimumPeerVersion` tests");
}
}

impl MinimumPeerVersion<MockChainTip> {
pub fn with_mock_chain_tip(network: Network) -> (Self, watch::Sender<Option<block::Height>>) {
let (chain_tip, best_tip_height) = MockChainTip::new();
let minimum_peer_version = MinimumPeerVersion::new(chain_tip, network);

(minimum_peer_version, best_tip_height)
}
}
Loading

0 comments on commit 0ad89f2

Please sign in to comment.