Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Disconnect from outdated peers on network upgrade #3108

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
26 commits
Select commit Hold shift + click to select a range
a9c82b3
Replace usage of `discover::Change` with a tuple
jvff Nov 24, 2021
0852a12
Add a `version` field to the `Client` type
jvff Nov 24, 2021
3b12430
Create `LoadTrackedClient` type
jvff Nov 26, 2021
9d2e163
Use `LoadTrackedClient` in `initialize`
jvff Dec 1, 2021
9128b09
Require `LoadTrackedClient` in `PeerSet`
jvff Nov 26, 2021
a298fa3
Create `MinimumPeerVersion` helper type
jvff Nov 26, 2021
a84226b
Use `MinimumPeerVersion` in handshakes
jvff Nov 26, 2021
4cf4f82
Add a `MinimumPeerVersion` instance to `PeerSet`
jvff Nov 26, 2021
07f79a8
Disconnect from ready services for outdated peers
jvff Nov 26, 2021
cc597b7
Cancel added unready services of outdated peers
jvff Nov 27, 2021
3002239
Avoid adding ready services for outdated peers
jvff Nov 27, 2021
cb0f2ee
Improve comment inside `crawl_and_dial`
jvff Dec 2, 2021
d2a64d8
Test if calculated minimum peer version is correct
jvff Dec 3, 2021
25b6989
Test if minimum version changes with chain tip
jvff Dec 3, 2021
96597c3
Test minimum peer version changed reports
jvff Dec 3, 2021
601cc65
Create a `MockedClientHandle` helper type
jvff Dec 3, 2021
a6c1fb8
Add `MinimumPeerVersion::with_mock_chain_tip`
jvff Dec 6, 2021
399a7fc
Bias arbitrary `Version`s to be in valid range
jvff Dec 7, 2021
08424c9
Create a `PeerVersions` helper type
jvff Dec 8, 2021
1fd8247
Create a `PeerSetGuard` helper type
jvff Dec 8, 2021
dacada2
Create a `PeerSetBuilder` helper type
jvff Dec 8, 2021
8a96a3a
Test if outdated peers are rejected by `PeerSet`
jvff Dec 8, 2021
4264bd1
Create `BlockHeightPairAcrossNetworkUpgrades` type
jvff Dec 8, 2021
b07a158
Test if peers are dropped as they become outdated
jvff Dec 8, 2021
0de7402
Remove dbg! macros
teor2345 Dec 9, 2021
280943e
Merge branch 'main' into disconnect-from-outdated-peers-on-network-up…
teor2345 Dec 9, 2021
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 8 additions & 2 deletions zebra-network/src/constants.rs
Original file line number Diff line number Diff line change
Expand Up @@ -200,7 +200,10 @@ pub const EWMA_DEFAULT_RTT: Duration = Duration::from_secs(REQUEST_TIMEOUT.as_se
///
/// This should be much larger than the `SYNC_RESTART_TIMEOUT`, so we choose
/// better peers when we restart the sync.
pub const EWMA_DECAY_TIME: Duration = Duration::from_secs(200);
pub const EWMA_DECAY_TIME_NANOS: f64 = 200.0 * NANOS_PER_SECOND;

/// The number of nanoseconds in one second.
const NANOS_PER_SECOND: f64 = 1_000_000_000.0;

lazy_static! {
/// The minimum network protocol version accepted by this crate for each network,
Expand Down Expand Up @@ -279,7 +282,10 @@ mod tests {
assert!(EWMA_DEFAULT_RTT > REQUEST_TIMEOUT,
"The default EWMA RTT should be higher than the request timeout, so new peers are required to prove they are fast, before we prefer them to other peers.");

assert!(EWMA_DECAY_TIME > REQUEST_TIMEOUT,
let request_timeout_nanos = REQUEST_TIMEOUT.as_secs_f64()
+ f64::from(REQUEST_TIMEOUT.subsec_nanos()) * NANOS_PER_SECOND;
jvff marked this conversation as resolved.
Show resolved Hide resolved

assert!(EWMA_DECAY_TIME_NANOS > request_timeout_nanos,
"The EWMA decay time should be higher than the request timeout, so timed out peers are penalised by the EWMA.");
}

Expand Down
13 changes: 12 additions & 1 deletion zebra-network/src/peer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,11 +10,22 @@ mod connector;
mod error;
/// Performs peer handshakes.
mod handshake;
/// Tracks the load on a `Client` service.
mod load_tracked_client;
/// Watches for chain tip height updates to determine the minimum support peer protocol version.
mod minimum_peer_version;

use client::{ClientRequest, ClientRequestReceiver, InProgressClientRequest, MustUseOneshotSender};
#[cfg(not(test))]
use client::ClientRequest;
#[cfg(test)]
pub(crate) use client::ClientRequest;

use client::{ClientRequestReceiver, InProgressClientRequest, MustUseOneshotSender};

pub use client::Client;
pub use connection::Connection;
pub use connector::{Connector, OutboundConnectorRequest};
pub use error::{ErrorSlot, HandshakeError, PeerError, SharedPeerError};
pub use handshake::{ConnectedAddr, Handshake, HandshakeRequest};
pub use load_tracked_client::LoadTrackedClient;
pub use minimum_peer_version::MinimumPeerVersion;
8 changes: 7 additions & 1 deletion zebra-network/src/peer/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,10 @@ use futures::{
};
use tower::Service;

use crate::protocol::internal::{Request, Response};
use crate::protocol::{
external::types::Version,
internal::{Request, Response},
};

use super::{ErrorSlot, PeerError, SharedPeerError};

Expand All @@ -28,6 +31,9 @@ pub struct Client {
///
/// `None` unless the connection or client have errored.
pub(crate) error_slot: ErrorSlot,

/// The peer connection's protocol version.
pub(crate) version: Version,
}

/// A message from the `peer::Client` to the `peer::Server`.
Expand Down
6 changes: 3 additions & 3 deletions zebra-network/src/peer/connector.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ use std::{

use futures::prelude::*;
use tokio::net::TcpStream;
use tower::{discover::Change, Service, ServiceExt};
use tower::{Service, ServiceExt};
use tracing_futures::Instrument;

use zebra_chain::chain_tip::{ChainTip, NoChainTip};
Expand Down Expand Up @@ -57,7 +57,7 @@ where
S::Future: Send,
C: ChainTip + Clone + Send + 'static,
{
type Response = Change<SocketAddr, Client>;
type Response = (SocketAddr, Client);
type Error = BoxError;
type Future =
Pin<Box<dyn Future<Output = Result<Self::Response, Self::Error>> + Send + 'static>>;
Expand Down Expand Up @@ -86,7 +86,7 @@ where
connection_tracker,
})
.await?;
Ok(Change::Insert(addr, client))
Ok((addr, client))
}
.instrument(connector_span)
.boxed()
Expand Down
20 changes: 12 additions & 8 deletions zebra-network/src/peer/handshake.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,9 @@ use zebra_chain::{
use crate::{
constants,
meta_addr::MetaAddrChange,
peer::{Client, ClientRequest, Connection, ErrorSlot, HandshakeError, PeerError},
peer::{
Client, ClientRequest, Connection, ErrorSlot, HandshakeError, MinimumPeerVersion, PeerError,
},
peer_set::ConnectionTracker,
protocol::{
external::{types::*, AddrInVersion, Codec, InventoryHash, Message},
Expand Down Expand Up @@ -59,7 +61,7 @@ pub struct Handshake<S, C = NoChainTip> {
our_services: PeerServices,
relay: bool,
parent_span: Span,
latest_chain_tip: C,
minimum_peer_version: MinimumPeerVersion<C>,
}

/// The peer address that we are handshaking with.
Expand Down Expand Up @@ -420,6 +422,8 @@ where
let user_agent = self.user_agent.unwrap_or_else(|| "".to_string());
let our_services = self.our_services.unwrap_or_else(PeerServices::empty);
let relay = self.relay.unwrap_or(false);
let network = config.network;
let minimum_peer_version = MinimumPeerVersion::new(self.latest_chain_tip, network);

Ok(Handshake {
config,
Expand All @@ -431,7 +435,7 @@ where
our_services,
relay,
parent_span: Span::current(),
latest_chain_tip: self.latest_chain_tip,
minimum_peer_version,
})
}
}
Expand Down Expand Up @@ -473,7 +477,7 @@ pub async fn negotiate_version(
user_agent: String,
our_services: PeerServices,
relay: bool,
latest_chain_tip: impl ChainTip,
mut minimum_peer_version: MinimumPeerVersion<impl ChainTip>,
) -> Result<(Version, PeerServices, SocketAddr), HandshakeError> {
// Create a random nonce for this connection
let local_nonce = Nonce::default();
Expand Down Expand Up @@ -589,8 +593,7 @@ pub async fn negotiate_version(

// SECURITY: Reject connections to peers on old versions, because they might not know about all
// network upgrades and could lead to chain forks or slower block propagation.
let height = latest_chain_tip.best_tip_height();
let min_version = Version::min_remote_for_height(config.network, height);
let min_version = minimum_peer_version.current();
if remote_version < min_version {
debug!(
remote_ip = ?their_addr,
Expand Down Expand Up @@ -716,7 +719,7 @@ where
let user_agent = self.user_agent.clone();
let our_services = self.our_services;
let relay = self.relay;
let latest_chain_tip = self.latest_chain_tip.clone();
let minimum_peer_version = self.minimum_peer_version.clone();

let fut = async move {
debug!(
Expand Down Expand Up @@ -747,7 +750,7 @@ where
user_agent,
our_services,
relay,
latest_chain_tip,
minimum_peer_version,
),
)
.await??;
Expand Down Expand Up @@ -792,6 +795,7 @@ where
shutdown_tx: Some(shutdown_tx),
server_tx: server_tx.clone(),
error_slot: slot.clone(),
version: remote_version,
};

let (peer_tx, peer_rx) = peer_conn.split();
Expand Down
71 changes: 71 additions & 0 deletions zebra-network/src/peer/load_tracked_client.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,71 @@
//! A peer connection service wrapper type to handle load tracking and provide access to the
//! reported protocol version.

use std::task::{Context, Poll};
teor2345 marked this conversation as resolved.
Show resolved Hide resolved

use tower::{
load::{Load, PeakEwma},
Service,
};

use crate::{
constants::{EWMA_DECAY_TIME_NANOS, EWMA_DEFAULT_RTT},
peer::Client,
protocol::external::types::Version,
};

/// A client service wrapper that keeps track of its load.
///
/// It also keeps track of the peer's reported protocol version.
pub struct LoadTrackedClient {
service: PeakEwma<Client>,
version: Version,
}

/// Create a new [`LoadTrackedClient`] wrapping the provided `client` service.
impl From<Client> for LoadTrackedClient {
fn from(client: Client) -> Self {
let version = client.version;

let service = PeakEwma::new(
client,
EWMA_DEFAULT_RTT,
EWMA_DECAY_TIME_NANOS,
tower::load::CompleteOnResponse::default(),
);

LoadTrackedClient { service, version }
}
}

impl LoadTrackedClient {
/// Retrieve the peer's reported protocol version.
pub fn version(&self) -> Version {
self.version
}
}

impl<Request> Service<Request> for LoadTrackedClient
where
Client: Service<Request>,
{
type Response = <Client as Service<Request>>::Response;
type Error = <Client as Service<Request>>::Error;
type Future = <PeakEwma<Client> as Service<Request>>::Future;

fn poll_ready(&mut self, context: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
self.service.poll_ready(context)
}

fn call(&mut self, request: Request) -> Self::Future {
self.service.call(request)
}
}

impl Load for LoadTrackedClient {
type Metric = <PeakEwma<Client> as Load>::Metric;

fn load(&self) -> Self::Metric {
self.service.load()
}
}
83 changes: 83 additions & 0 deletions zebra-network/src/peer/minimum_peer_version.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,83 @@
use zebra_chain::{chain_tip::ChainTip, parameters::Network};

use crate::protocol::external::types::Version;

#[cfg(any(test, feature = "proptest-impl"))]
mod tests;

/// A helper type to monitor the chain tip in order to determine the minimum peer protocol version
/// that is currently supported.
pub struct MinimumPeerVersion<C> {
network: Network,
chain_tip: C,
current_minimum: Version,
has_changed: bool,
}

impl<C> MinimumPeerVersion<C>
where
C: ChainTip,
{
/// Create a new [`MinimumPeerVersion`] to track the minimum supported peer protocol version
/// for the current `chain_tip` on the `network`.
pub fn new(chain_tip: C, network: Network) -> Self {
MinimumPeerVersion {
network,
chain_tip,
current_minimum: Version::min_remote_for_height(network, None),
has_changed: true,
}
}

/// Check if the minimum supported peer version has changed since the last time this was
/// called.
///
/// The first call returns the current minimum version, and subsequent calls return [`None`]
/// until the minimum version changes. When that happens, the next call returns the new minimum
/// version, and subsequent calls return [`None`] again until the minimum version changes once
/// more.
pub fn changed(&mut self) -> Option<Version> {
self.update();

if self.has_changed {
self.has_changed = false;
Some(self.current_minimum)
} else {
None
}
}

/// Retrieve the current minimum supported peer protocol version.
pub fn current(&mut self) -> Version {
self.update();
self.current_minimum
}

/// Check the current chain tip height to determine the minimum peer version, and detect if it
/// has changed.
fn update(&mut self) {
let height = self.chain_tip.best_tip_height();
let new_minimum = Version::min_remote_for_height(self.network, height);

if self.current_minimum != new_minimum {
self.current_minimum = new_minimum;
self.has_changed = true;
}
}
}

/// A custom [`Clone`] implementation to ensure that the first call to
/// [`MinimumPeerVersion::changed`] after the clone will always return the current version.
impl<C> Clone for MinimumPeerVersion<C>
where
C: Clone,
{
fn clone(&self) -> Self {
MinimumPeerVersion {
network: self.network,
chain_tip: self.chain_tip.clone(),
current_minimum: self.current_minimum,
has_changed: true,
}
}
}
57 changes: 57 additions & 0 deletions zebra-network/src/peer/minimum_peer_version/tests.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,57 @@
use std::sync::Arc;

use tokio::sync::watch;

use zebra_chain::{block, chain_tip::ChainTip, parameters::Network, transaction};

use super::MinimumPeerVersion;

#[cfg(test)]
mod prop;

/// A mock [`ChainTip`] implementation that allows setting the `best_tip_height` externally.
#[derive(Clone)]
pub struct MockChainTip {
best_tip_height: watch::Receiver<Option<block::Height>>,
}

impl MockChainTip {
/// Create a new [`MockChainTip`].
///
/// Returns the [`MockChainTip`] instance and the endpoint to modiy the current best tip
/// height.
///
/// Initially, the best tip height is [`None`].
pub fn new() -> (Self, watch::Sender<Option<block::Height>>) {
let (sender, receiver) = watch::channel(None);

let mock_chain_tip = MockChainTip {
best_tip_height: receiver,
};

(mock_chain_tip, sender)
}
}

impl ChainTip for MockChainTip {
fn best_tip_height(&self) -> Option<block::Height> {
*self.best_tip_height.borrow()
}

fn best_tip_hash(&self) -> Option<block::Hash> {
unreachable!("Method not used in `MinimumPeerVersion` tests");
}

fn best_tip_mined_transaction_ids(&self) -> Arc<[transaction::Hash]> {
unreachable!("Method not used in `MinimumPeerVersion` tests");
}
}

impl MinimumPeerVersion<MockChainTip> {
pub fn with_mock_chain_tip(network: Network) -> (Self, watch::Sender<Option<block::Height>>) {
let (chain_tip, best_tip_height) = MockChainTip::new();
let minimum_peer_version = MinimumPeerVersion::new(chain_tip, network);

(minimum_peer_version, best_tip_height)
}
}
Loading