Skip to content

Commit

Permalink
Halborn 2023 02 20 (#6297)
Browse files Browse the repository at this point in the history
* Limit version user agents to 256 bytes, rather than 2MB, needs failure tests

* Limit all inv messages to 50,000 entries, existing tests cover this

* Limit reject message strings based on network protocol, needs success and failure tests

* Catch up as fast as possible if inventory rotation is delayed, existing tests cover this

* Truncate inv channel hashes to 1000, needs success and failure tests

* Limit inv registry size to 4 MB, needs over-limit tests for inv and addr

* Test size constraints on version user agent, reject command, and reject reason (#13)

* Test inventory registry memory limits for hashes and peers (#14)

Co-authored-by: Deirdre Connolly <durumcrustulum@gmail.com>

---------

Co-authored-by: teor <teor@riseup.net>
Co-authored-by: Arya <aryasolhi@gmail.com>
  • Loading branch information
3 people authored Mar 13, 2023
1 parent 6ed1d49 commit 47cf0f4
Show file tree
Hide file tree
Showing 9 changed files with 531 additions and 36 deletions.
5 changes: 3 additions & 2 deletions zebra-chain/src/serialization.rs
Original file line number Diff line number Diff line change
Expand Up @@ -32,8 +32,9 @@ pub use error::SerializationError;
pub use read_zcash::ReadZcashExt;
pub use write_zcash::WriteZcashExt;
pub use zcash_deserialize::{
zcash_deserialize_bytes_external_count, zcash_deserialize_external_count, TrustedPreallocate,
ZcashDeserialize, ZcashDeserializeInto,
zcash_deserialize_bytes_external_count, zcash_deserialize_external_count,
zcash_deserialize_string_external_count, TrustedPreallocate, ZcashDeserialize,
ZcashDeserializeInto,
};
pub use zcash_serialize::{
zcash_serialize_bytes, zcash_serialize_bytes_external_count, zcash_serialize_empty_list,
Expand Down
23 changes: 20 additions & 3 deletions zebra-chain/src/serialization/zcash_deserialize.rs
Original file line number Diff line number Diff line change
Expand Up @@ -120,11 +120,28 @@ pub fn zcash_deserialize_bytes_external_count<R: io::Read>(
Ok(vec)
}

/// `zcash_deserialize_external_count`, specialised for [`String`].
/// The external count is in bytes. (Not UTF-8 characters.)
///
/// This allows us to optimize the inner loop into a single call to `read_exact()`.
///
/// This function has a `zcash_` prefix to alert the reader that the
/// serialization in use is consensus-critical serialization, rather than
/// some other kind of serialization.
pub fn zcash_deserialize_string_external_count<R: io::Read>(
external_byte_count: usize,
reader: R,
) -> Result<String, SerializationError> {
let bytes = zcash_deserialize_bytes_external_count(external_byte_count, reader)?;

String::from_utf8(bytes).map_err(|_| SerializationError::Parse("invalid utf-8"))
}

/// Read a Bitcoin-encoded UTF-8 string.
impl ZcashDeserialize for String {
fn zcash_deserialize<R: io::Read>(reader: R) -> Result<Self, SerializationError> {
let bytes: Vec<_> = Vec::zcash_deserialize(reader)?;
String::from_utf8(bytes).map_err(|_| SerializationError::Parse("invalid utf-8"))
fn zcash_deserialize<R: io::Read>(mut reader: R) -> Result<Self, SerializationError> {
let byte_count: CompactSizeMessage = (&mut reader).zcash_deserialize_into()?;
zcash_deserialize_string_external_count(byte_count.into(), reader)
}
}

Expand Down
130 changes: 117 additions & 13 deletions zebra-network/src/peer_set/inventory_registry.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,14 +3,13 @@
//! [RFC]: https://zebra.zfnd.org/dev/rfcs/0003-inventory-tracking.html
use std::{
collections::HashMap,
convert::TryInto,
net::SocketAddr,
pin::Pin,
task::{Context, Poll},
};

use futures::{FutureExt, Stream, StreamExt};
use indexmap::IndexMap;
use tokio::{
sync::broadcast,
time::{self, Instant},
Expand All @@ -35,6 +34,40 @@ pub mod update;
#[cfg(test)]
mod tests;

/// The maximum number of inventory hashes we will track from a single peer.
///
/// # Security
///
/// This limits known memory denial of service attacks like <https://invdos.net/> to a total of:
/// ```text
/// 1000 inventory * 2 maps * 32-64 bytes per inventory = less than 1 MB
/// 1000 inventory * 70 peers * 2 maps * 6-18 bytes per address = up to 3 MB
/// ```
///
/// Since the inventory registry is an efficiency optimisation, which falls back to a
/// random peer, we only need to track a small number of hashes for available inventory.
///
/// But we want to be able to track a significant amount of missing inventory,
/// to limit queries for globally missing inventory.
//
// TODO: split this into available (25) and missing (1000 or more?)
pub const MAX_INV_PER_MAP: usize = 1000;

/// The maximum number of peers we will track inventory for.
///
/// # Security
///
/// This limits known memory denial of service attacks. See [`MAX_INV_PER_MAP`] for details.
///
/// Since the inventory registry is an efficiency optimisation, which falls back to a
/// random peer, we only need to track a small number of peers per inv for available inventory.
///
/// But we want to be able to track missing inventory for almost all our peers,
/// so we only query a few peers for inventory that is genuinely missing from the network.
//
// TODO: split this into available (25) and missing (70)
pub const MAX_PEERS_PER_INV: usize = 70;

/// A peer inventory status, which tracks a hash for both available and missing inventory.
pub type InventoryStatus<T> = InventoryResponse<T, T>;

Expand All @@ -59,10 +92,12 @@ type InventoryMarker = InventoryStatus<()>;
pub struct InventoryRegistry {
/// Map tracking the latest inventory status from the current interval
/// period.
current: HashMap<InventoryHash, HashMap<SocketAddr, InventoryMarker>>,
//
// TODO: split maps into available and missing, so we can limit them separately.
current: IndexMap<InventoryHash, IndexMap<SocketAddr, InventoryMarker>>,

/// Map tracking inventory statuses from the previous interval period.
prev: HashMap<InventoryHash, HashMap<SocketAddr, InventoryMarker>>,
prev: IndexMap<InventoryHash, IndexMap<SocketAddr, InventoryMarker>>,

/// Stream of incoming inventory statuses to register.
inv_stream: Pin<
Expand Down Expand Up @@ -99,7 +134,17 @@ impl InventoryChange {
hashes: impl IntoIterator<Item = &'a InventoryHash>,
peer: SocketAddr,
) -> Option<Self> {
let hashes: Vec<InventoryHash> = hashes.into_iter().copied().collect();
let mut hashes: Vec<InventoryHash> = hashes.into_iter().copied().collect();

// # Security
//
// Don't send more hashes than we're going to store.
// It doesn't matter which hashes we choose, because this is an efficiency optimisation.
//
// This limits known memory denial of service attacks to:
// `1000 hashes * 200 peers/channel capacity * 32-64 bytes = up to 12 MB`
hashes.truncate(MAX_INV_PER_MAP);

let hashes = hashes.try_into().ok();

hashes.map(|hashes| InventoryStatus::Available((hashes, peer)))
Expand All @@ -110,7 +155,14 @@ impl InventoryChange {
hashes: impl IntoIterator<Item = &'a InventoryHash>,
peer: SocketAddr,
) -> Option<Self> {
let hashes: Vec<InventoryHash> = hashes.into_iter().copied().collect();
let mut hashes: Vec<InventoryHash> = hashes.into_iter().copied().collect();

// # Security
//
// Don't send more hashes than we're going to store.
// It doesn't matter which hashes we choose, because this is an efficiency optimisation.
hashes.truncate(MAX_INV_PER_MAP);

let hashes = hashes.try_into().ok();

hashes.map(|hashes| InventoryStatus::Missing((hashes, peer)))
Expand Down Expand Up @@ -149,8 +201,15 @@ impl InventoryRegistry {

// Don't do an immediate rotation, current and prev are already empty.
let mut interval = tokio::time::interval_at(Instant::now() + interval, interval);
// SECURITY: if the rotation time is late, delay future rotations by the same amount
interval.set_missed_tick_behavior(time::MissedTickBehavior::Delay);
// # Security
//
// If the rotation time is late, execute as many ticks as needed to catch up.
// This is a tradeoff between memory usage and quickly accessing remote data
// under heavy load. Bursting prioritises lower memory usage.
//
// Skipping or delaying could keep peer inventory in memory for a longer time,
// further increasing memory load or delays due to virtual memory swapping.
interval.set_missed_tick_behavior(time::MissedTickBehavior::Burst);

Self {
current: Default::default(),
Expand Down Expand Up @@ -206,6 +265,17 @@ impl InventoryRegistry {
.is_some()
}

/// Returns an iterator over peer inventory status hashes.
///
/// Yields current statuses first, then previously rotated statuses.
/// This can include multiple statuses for the same hash.
#[allow(dead_code)]
pub fn status_hashes(
&self,
) -> impl Iterator<Item = (&InventoryHash, &IndexMap<SocketAddr, InventoryMarker>)> {
self.current.iter().chain(self.prev.iter())
}

/// Returns a future that polls once for new registry updates.
#[allow(dead_code)]
pub fn update(&mut self) -> Update {
Expand All @@ -219,8 +289,19 @@ impl InventoryRegistry {
/// - rotates HashMaps based on interval events
/// - drains the inv_stream channel and registers all advertised inventory
pub fn poll_inventory(&mut self, cx: &mut Context<'_>) -> Result<(), BoxError> {
// Correctness: Registers the current task for wakeup when the timer next becomes ready.
while Pin::new(&mut self.interval).poll_next(cx).is_ready() {
// # Correctness
//
// Registers the current task for wakeup when the timer next becomes ready.
//
// # Security
//
// Only rotate one inventory per peer request, to give the next inventory
// time to gather some peer advertisements. This is a tradeoff between
// memory usage and quickly accessing remote data under heavy load.
//
// This prevents a burst edge case where all inventory is emptied after
// two interval ticks are delayed.
if Pin::new(&mut self.interval).poll_next(cx).is_ready() {
self.rotate();
}

Expand Down Expand Up @@ -274,13 +355,13 @@ impl InventoryRegistry {
"unexpected inventory type: {inv:?} from peer: {addr:?}",
);

let current = self.current.entry(inv).or_default();
let hash_peers = self.current.entry(inv).or_default();

// # Security
//
// Prefer `missing` over `advertised`, so malicious peers can't reset their own entries,
// and funnel multiple failing requests to themselves.
if let Some(old_status) = current.get(&addr) {
if let Some(old_status) = hash_peers.get(&addr) {
if old_status.is_missing() && new_status.is_available() {
debug!(?new_status, ?old_status, ?addr, ?inv, "skipping new status");
continue;
Expand All @@ -295,14 +376,37 @@ impl InventoryRegistry {
);
}

let replaced_status = current.insert(addr, new_status);
let replaced_status = hash_peers.insert(addr, new_status);

debug!(
?new_status,
?replaced_status,
?addr,
?inv,
"inserted new status"
);

// # Security
//
// Limit the number of stored peers per hash, removing the oldest entries,
// because newer entries are likely to be more relevant.
//
// TODO: do random or weighted random eviction instead?
if hash_peers.len() > MAX_PEERS_PER_INV {
// Performance: `MAX_PEERS_PER_INV` is small, so O(n) performance is acceptable.
hash_peers.shift_remove_index(0);
}

// # Security
//
// Limit the number of stored inventory hashes, removing the oldest entries,
// because newer entries are likely to be more relevant.
//
// TODO: do random or weighted random eviction instead?
if self.current.len() > MAX_INV_PER_MAP {
// Performance: `MAX_INV_PER_MAP` is small, so O(n) performance is acceptable.
self.current.shift_remove_index(0);
}
}
}

Expand Down
102 changes: 100 additions & 2 deletions zebra-network/src/peer_set/inventory_registry/tests/vectors.rs
Original file line number Diff line number Diff line change
@@ -1,9 +1,14 @@
//! Fixed test vectors for the inventory registry.
use zebra_chain::block;
use std::{cmp::min, net::SocketAddr};

use zebra_chain::{block, serialization::AtLeastOne, transaction};

use crate::{
peer_set::inventory_registry::{tests::new_inv_registry, InventoryStatus},
peer_set::inventory_registry::{
tests::new_inv_registry, InventoryMarker, InventoryStatus, MAX_INV_PER_MAP,
MAX_PEERS_PER_INV,
},
protocol::external::InventoryHash,
};

Expand Down Expand Up @@ -182,3 +187,96 @@ async fn inv_registry_prefer_current_order(missing_current: bool) {
assert_eq!(inv_registry.missing_peers(test_hash).count(), 0);
}
}

/// Check inventory registration limits.
#[tokio::test]
async fn inv_registry_limit() {
inv_registry_limit_for(InventoryMarker::Available(())).await;
inv_registry_limit_for(InventoryMarker::Missing(())).await;
}

/// Check the inventory registration limit for `status`.
async fn inv_registry_limit_for(status: InventoryMarker) {
let single_test_hash = InventoryHash::Block(block::Hash([0xbb; 32]));
let single_test_peer = "1.1.1.1:1"
.parse()
.expect("unexpected invalid peer address");

let (mut inv_registry, inv_stream_tx) = new_inv_registry();

// Check hash limit
for hash_count in 0..(MAX_INV_PER_MAP + 10) {
let mut test_hash = hash_count.to_ne_bytes().to_vec();
test_hash.resize(32, 0);
let test_hash = InventoryHash::Tx(transaction::Hash(test_hash.try_into().unwrap()));

let test_change = status.map(|()| (AtLeastOne::from_one(test_hash), single_test_peer));

let receiver_count = inv_stream_tx
.send(test_change)
.expect("unexpected failed inventory status send");

assert_eq!(receiver_count, 1);

inv_registry
.update()
.await
.expect("unexpected dropped registry sender channel");

if status.is_available() {
assert_eq!(inv_registry.advertising_peers(test_hash).count(), 1);
assert_eq!(inv_registry.missing_peers(test_hash).count(), 0);
} else {
assert_eq!(inv_registry.advertising_peers(test_hash).count(), 0);
assert_eq!(inv_registry.missing_peers(test_hash).count(), 1);
}

// SECURITY: limit inventory memory usage
assert_eq!(
inv_registry.status_hashes().count(),
min(hash_count + 1, MAX_INV_PER_MAP),
);
}

// Check peer address per hash limit
let (mut inv_registry, inv_stream_tx) = new_inv_registry();

for peer_count in 0..(MAX_PEERS_PER_INV + 10) {
let test_peer = SocketAddr::new(
"2.2.2.2".parse().unwrap(),
peer_count.try_into().expect("fits in u16"),
);

let test_change = status.map(|()| (AtLeastOne::from_one(single_test_hash), test_peer));

let receiver_count = inv_stream_tx
.send(test_change)
.expect("unexpected failed inventory status send");

assert_eq!(receiver_count, 1);

inv_registry
.update()
.await
.expect("unexpected dropped registry sender channel");

assert_eq!(inv_registry.status_hashes().count(), 1);

let limited_count = min(peer_count + 1, MAX_PEERS_PER_INV);

// SECURITY: limit inventory memory usage
if status.is_available() {
assert_eq!(
inv_registry.advertising_peers(single_test_hash).count(),
limited_count,
);
assert_eq!(inv_registry.missing_peers(single_test_hash).count(), 0);
} else {
assert_eq!(inv_registry.advertising_peers(single_test_hash).count(), 0);
assert_eq!(
inv_registry.missing_peers(single_test_hash).count(),
limited_count,
);
}
}
}
Loading

0 comments on commit 47cf0f4

Please sign in to comment.