Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Security: Use canonical SocketAddrs to avoid duplicate peer connections, Feature: Send local listener to peers #2276

Merged
merged 17 commits into from
Jun 22, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion zebra-chain/src/serialization.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ pub mod arbitrary;
pub use constraint::AtLeastOne;
pub use date_time::DateTime32;
pub use error::SerializationError;
pub use read_zcash::ReadZcashExt;
pub use read_zcash::{canonical_socket_addr, ReadZcashExt};
pub use write_zcash::WriteZcashExt;
pub use zcash_deserialize::{
zcash_deserialize_bytes_external_count, zcash_deserialize_external_count, TrustedPreallocate,
Expand Down
10 changes: 3 additions & 7 deletions zebra-chain/src/serialization/arbitrary.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
//! Arbitrary data generation for serialization proptests

use super::{read_zcash::canonical_ip_addr, DateTime32};
use super::{read_zcash::canonical_socket_addr, DateTime32};
use chrono::{TimeZone, Utc, MAX_DATETIME, MIN_DATETIME};
use proptest::{arbitrary::any, prelude::*};
use std::net::SocketAddr;
Expand Down Expand Up @@ -59,10 +59,6 @@ pub fn datetime_u32() -> impl Strategy<Value = chrono::DateTime<Utc>> {
/// Returns a random canonical Zebra `SocketAddr`.
///
/// See [`canonical_ip_addr`] for details.
pub fn canonical_socket_addr() -> impl Strategy<Value = SocketAddr> {
use SocketAddr::*;
any::<SocketAddr>().prop_map(|addr| match addr {
V4(_) => addr,
V6(v6_addr) => SocketAddr::new(canonical_ip_addr(v6_addr.ip()), v6_addr.port()),
})
pub fn canonical_socket_addr_strategy() -> impl Strategy<Value = SocketAddr> {
any::<SocketAddr>().prop_map(canonical_socket_addr)
}
90 changes: 74 additions & 16 deletions zebra-chain/src/serialization/date_time.rs
Original file line number Diff line number Diff line change
Expand Up @@ -44,40 +44,70 @@ impl DateTime32 {
self.into()
}

/// Returns the current time
/// Returns the current time.
///
/// # Panics
///
/// If the number of seconds since the UNIX epoch is greater than `u32::MAX`.
pub fn now() -> DateTime32 {
Utc::now()
.try_into()
.expect("unexpected out of range chrono::DateTime")
}

/// Returns the number of seconds elapsed between `earlier` and this time,
/// Returns the duration elapsed between `earlier` and this time,
/// or `None` if `earlier` is later than this time.
pub fn checked_duration_since(&self, earlier: DateTime32) -> Option<Duration32> {
self.timestamp
.checked_sub(earlier.timestamp)
.map(|seconds| Duration32 { seconds })
.map(Duration32::from)
}

/// Returns the number of seconds elapsed between `earlier` and this time,
/// Returns duration elapsed between `earlier` and this time,
/// or zero if `earlier` is later than this time.
pub fn saturating_duration_since(&self, earlier: DateTime32) -> Duration32 {
Duration32 {
seconds: self.timestamp.saturating_sub(earlier.timestamp),
}
Duration32::from(self.timestamp.saturating_sub(earlier.timestamp))
}

/// Returns the number of seconds elapsed since this time,
/// Returns the duration elapsed since this time,
/// or if this time is in the future, returns `None`.
pub fn checked_elapsed(&self) -> Option<Duration32> {
DateTime32::now().checked_duration_since(*self)
}

/// Returns the number of seconds elapsed since this time,
/// Returns the duration elapsed since this time,
/// or if this time is in the future, returns zero.
pub fn saturating_elapsed(&self) -> Duration32 {
DateTime32::now().saturating_duration_since(*self)
}

/// Returns the time that is `duration` after this time.
/// If the calculation overflows, returns `None`.
pub fn checked_add(&self, duration: Duration32) -> Option<DateTime32> {
self.timestamp
.checked_add(duration.seconds)
.map(DateTime32::from)
}

/// Returns the time that is `duration` after this time.
/// If the calculation overflows, returns `DateTime32::MAX`.
pub fn saturating_add(&self, duration: Duration32) -> DateTime32 {
DateTime32::from(self.timestamp.saturating_add(duration.seconds))
}

/// Returns the time that is `duration` before this time.
/// If the calculation underflows, returns `None`.
pub fn checked_sub(&self, duration: Duration32) -> Option<DateTime32> {
self.timestamp
.checked_sub(duration.seconds)
.map(DateTime32::from)
}

/// Returns the time that is `duration` before this time.
/// If the calculation underflows, returns `DateTime32::MIN`.
pub fn saturating_sub(&self, duration: Duration32) -> DateTime32 {
DateTime32::from(self.timestamp.saturating_sub(duration.seconds))
}
}

impl Duration32 {
Expand All @@ -87,7 +117,7 @@ impl Duration32 {
/// The latest possible `Duration32` value.
pub const MAX: Duration32 = Duration32 { seconds: u32::MAX };

/// Returns the number of seconds.
/// Returns the number of seconds in this duration.
pub fn seconds(&self) -> u32 {
self.seconds
}
Expand All @@ -101,6 +131,34 @@ impl Duration32 {
pub fn to_std(self) -> std::time::Duration {
self.into()
}

/// Returns a duration that is `duration` longer than this duration.
/// If the calculation overflows, returns `None`.
pub fn checked_add(&self, duration: Duration32) -> Option<Duration32> {
self.seconds
.checked_add(duration.seconds)
.map(Duration32::from)
}

/// Returns a duration that is `duration` longer than this duration.
/// If the calculation overflows, returns `Duration32::MAX`.
pub fn saturating_add(&self, duration: Duration32) -> Duration32 {
Duration32::from(self.seconds.saturating_add(duration.seconds))
}

/// Returns a duration that is `duration` shorter than this duration.
/// If the calculation underflows, returns `None`.
pub fn checked_sub(&self, duration: Duration32) -> Option<Duration32> {
self.seconds
.checked_sub(duration.seconds)
.map(Duration32::from)
}

/// Returns a duration that is `duration` shorter than this duration.
/// If the calculation underflows, returns `Duration32::MIN`.
pub fn saturating_sub(&self, duration: Duration32) -> Duration32 {
Duration32::from(self.seconds.saturating_sub(duration.seconds))
}
}

impl fmt::Debug for DateTime32 {
Expand Down Expand Up @@ -189,7 +247,7 @@ impl TryFrom<chrono::DateTime<Utc>> for DateTime32 {

/// Convert from a [`chrono::DateTime`] to a [`DateTime32`], discarding any nanoseconds.
///
/// Conversion fails if the number of seconds is outside the `u32` range.
/// Conversion fails if the number of seconds since the UNIX epoch is outside the `u32` range.
fn try_from(value: chrono::DateTime<Utc>) -> Result<Self, Self::Error> {
Ok(Self {
timestamp: value.timestamp().try_into()?,
Expand All @@ -202,7 +260,7 @@ impl TryFrom<&chrono::DateTime<Utc>> for DateTime32 {

/// Convert from a [`chrono::DateTime`] to a [`DateTime32`], discarding any nanoseconds.
///
/// Conversion fails if the number of seconds is outside the `u32` range.
/// Conversion fails if the number of seconds since the UNIX epoch is outside the `u32` range.
fn try_from(value: &chrono::DateTime<Utc>) -> Result<Self, Self::Error> {
(*value).try_into()
}
Expand All @@ -213,7 +271,7 @@ impl TryFrom<chrono::Duration> for Duration32 {

/// Convert from a [`chrono::Duration`] to a [`Duration32`], discarding any nanoseconds.
///
/// Conversion fails if the number of seconds is outside the `u32` range.
/// Conversion fails if the number of seconds since the UNIX epoch is outside the `u32` range.
fn try_from(value: chrono::Duration) -> Result<Self, Self::Error> {
Ok(Self {
seconds: value.num_seconds().try_into()?,
Expand All @@ -226,7 +284,7 @@ impl TryFrom<&chrono::Duration> for Duration32 {

/// Convert from a [`chrono::Duration`] to a [`Duration32`], discarding any nanoseconds.
///
/// Conversion fails if the number of seconds is outside the `u32` range.
/// Conversion fails if the number of seconds in the duration is outside the `u32` range.
fn try_from(value: &chrono::Duration) -> Result<Self, Self::Error> {
(*value).try_into()
}
Expand All @@ -237,7 +295,7 @@ impl TryFrom<std::time::Duration> for Duration32 {

/// Convert from a [`std::time::Duration`] to a [`Duration32`], discarding any nanoseconds.
///
/// Conversion fails if the number of seconds is outside the `u32` range.
/// Conversion fails if the number of seconds in the duration is outside the `u32` range.
fn try_from(value: std::time::Duration) -> Result<Self, Self::Error> {
Ok(Self {
seconds: value.as_secs().try_into()?,
Expand All @@ -250,7 +308,7 @@ impl TryFrom<&std::time::Duration> for Duration32 {

/// Convert from a [`std::time::Duration`] to a [`Duration32`], discarding any nanoseconds.
///
/// Conversion fails if the number of seconds is outside the `u32` range.
/// Conversion fails if the number of seconds in the duration is outside the `u32` range.
fn try_from(value: &std::time::Duration) -> Result<Self, Self::Error> {
(*value).try_into()
}
Expand Down
17 changes: 17 additions & 0 deletions zebra-chain/src/serialization/read_zcash.rs
Original file line number Diff line number Diff line change
Expand Up @@ -162,3 +162,20 @@ pub fn canonical_ip_addr(v6_addr: &Ipv6Addr) -> IpAddr {
Some(_) | None => V6(*v6_addr),
}
}

/// Transform a `SocketAddr` into a canonical Zebra `SocketAddr`, converting
/// IPv6-mapped IPv4 addresses, and removing IPv6 scope IDs and flow information.
///
/// See [`canonical_ip_addr`] for detailed info on IPv6-mapped IPv4 addresses.
pub fn canonical_socket_addr(socket_addr: impl Into<SocketAddr>) -> SocketAddr {
use SocketAddr::*;

let mut socket_addr = socket_addr.into();
if let V6(v6_socket_addr) = socket_addr {
let canonical_ip = canonical_ip_addr(v6_socket_addr.ip());
// creating a new SocketAddr removes scope IDs and flow information
socket_addr = SocketAddr::new(canonical_ip, socket_addr.port());
teor2345 marked this conversation as resolved.
Show resolved Hide resolved
}

socket_addr
}
70 changes: 31 additions & 39 deletions zebra-network/src/address_book.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,8 @@ use std::{

use tracing::Span;

use zebra_chain::serialization::canonical_socket_addr;

use crate::{meta_addr::MetaAddrChange, types::MetaAddr, Config, PeerAddrState};

/// A database of peer listener addresses, their advertised services, and
Expand Down Expand Up @@ -94,7 +96,7 @@ impl AddressBook {

let mut new_book = AddressBook {
by_addr: HashMap::default(),
local_listener: config.listen_addr,
local_listener: canonical_socket_addr(config.listen_addr),
span,
last_address_log: None,
};
Expand All @@ -118,6 +120,11 @@ impl AddressBook {

let addrs = addrs
.into_iter()
.map(|mut meta_addr| {
meta_addr.addr = canonical_socket_addr(meta_addr.addr);
meta_addr
})
.filter(MetaAddr::address_is_valid_for_outbound)
.map(|meta_addr| (meta_addr.addr, meta_addr));
new_book.by_addr.extend(addrs);

Expand All @@ -126,34 +133,36 @@ impl AddressBook {
}

/// Get the local listener address.
pub fn get_local_listener(&self) -> MetaAddrChange {
MetaAddr::new_local_listener(&self.local_listener)
///
/// This address contains minimal state, but it is not sanitized.
pub fn get_local_listener(&self) -> MetaAddr {
MetaAddr::new_local_listener_change(&self.local_listener)
.into_new_meta_addr()
.expect("unexpected invalid new local listener addr")
}

/// Get the contents of `self` in random order with sanitized timestamps.
pub fn sanitized(&self) -> Vec<MetaAddr> {
use rand::seq::SliceRandom;
let _guard = self.span.enter();
let mut peers = self
.peers()
.filter_map(|a| MetaAddr::sanitize(&a))

let mut peers = self.by_addr.clone();

// Unconditionally add our local listener address to the advertised peers,
// to replace any self-connection failures. The address book and change
// constructors make sure that the SocketAddr is canonical.
let local_listener = self.get_local_listener();
peers.insert(local_listener.addr, local_listener);
teor2345 marked this conversation as resolved.
Show resolved Hide resolved

// Then sanitize and shuffle
let mut peers = peers
.values()
.filter_map(MetaAddr::sanitize)
.collect::<Vec<_>>();
peers.shuffle(&mut rand::thread_rng());
peers
}

/// Returns true if the address book has an entry for `addr`.
teor2345 marked this conversation as resolved.
Show resolved Hide resolved
pub fn contains_addr(&self, addr: &SocketAddr) -> bool {
let _guard = self.span.enter();
self.by_addr.contains_key(addr)
}

/// Returns the entry corresponding to `addr`, or `None` if it does not exist.
pub fn get_by_addr(&self, addr: SocketAddr) -> Option<MetaAddr> {
let _guard = self.span.enter();
self.by_addr.get(&addr).cloned()
}

/// Apply `change` to the address book, returning the updated `MetaAddr`,
/// if the change was valid.
///
Expand All @@ -162,6 +171,9 @@ impl AddressBook {
/// All changes should go through `update`, so that the address book
/// only contains valid outbound addresses.
///
/// Change addresses must be canonical `SocketAddr`s. This makes sure that
/// each address book entry has a unique IP address.
///
/// # Security
///
/// This function must apply every attempted, responded, and failed change
Expand Down Expand Up @@ -218,6 +230,7 @@ impl AddressBook {
///
/// All address removals should go through `take`, so that the address
/// book metrics are accurate.
#[allow(dead_code)]
fn take(&mut self, removed_addr: SocketAddr) -> Option<MetaAddr> {
let _guard = self.span.enter();
trace!(
Expand Down Expand Up @@ -314,14 +327,6 @@ impl AddressBook {
.cloned()
}

/// Returns an iterator that drains entries from the address book.
///
/// Removes entries in reconnection attempt then arbitrary order,
/// see [`peers`] for details.
pub fn drain(&'_ mut self) -> impl Iterator<Item = MetaAddr> + '_ {
Drain { book: self }
}

/// Returns the number of entries in this address book.
pub fn len(&self) -> usize {
self.by_addr.len()
Expand Down Expand Up @@ -442,16 +447,3 @@ impl Extend<MetaAddrChange> for AddressBook {
}
}
}

struct Drain<'a> {
book: &'a mut AddressBook,
}

impl<'a> Iterator for Drain<'a> {
type Item = MetaAddr;

fn next(&mut self) -> Option<Self::Item> {
let next_item_addr = self.book.peers().next()?.addr;
self.book.take(next_item_addr)
}
}
6 changes: 3 additions & 3 deletions zebra-network/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ use std::{

use serde::{de, Deserialize, Deserializer};

use zebra_chain::parameters::Network;
use zebra_chain::{parameters::Network, serialization::canonical_socket_addr};

use crate::BoxError;

Expand Down Expand Up @@ -131,7 +131,7 @@ impl Config {
let fut = tokio::time::timeout(crate::constants::DNS_LOOKUP_TIMEOUT, fut);

match fut.await {
Ok(Ok(ips)) => Ok(ips.collect()),
Ok(Ok(ips)) => Ok(ips.map(canonical_socket_addr).collect()),
Ok(Err(e)) => {
tracing::info!(?host, ?e, "DNS error resolving peer IP address");
Err(e.into())
Expand Down Expand Up @@ -237,7 +237,7 @@ impl<'de> Deserialize<'de> for Config {
}?;

Ok(Config {
listen_addr,
listen_addr: canonical_socket_addr(listen_addr),
network: config.network,
initial_mainnet_peers: config.initial_mainnet_peers,
initial_testnet_peers: config.initial_testnet_peers,
Expand Down
Loading