Skip to content

Commit

Permalink
Sanitise eth68 announcement (paradigmxyz#6222)
Browse files Browse the repository at this point in the history
Co-authored-by: Matthias Seitz <matthias.seitz@outlook.de>
Co-authored-by: Bjerg <onbjerg@users.noreply.github.com>
Co-authored-by: Dan Cline <6798349+Rjected@users.noreply.github.com>
  • Loading branch information
4 people authored Jan 31, 2024
1 parent 6066402 commit 34cda3a
Show file tree
Hide file tree
Showing 16 changed files with 822 additions and 60 deletions.
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions crates/config/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -381,6 +381,7 @@ timeout = -4096
bad_protocol = -2147483648
failed_to_connect = -25600
dropped = -4096
bad_announcement = -1204
[peers.backoff_durations]
low = '30s'
Expand Down
140 changes: 136 additions & 4 deletions crates/net/eth-wire/src/types/broadcast.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,9 +5,9 @@ use alloy_rlp::{
Decodable, Encodable, RlpDecodable, RlpDecodableWrapper, RlpEncodable, RlpEncodableWrapper,
};
use reth_codecs::derive_arbitrary;
use reth_primitives::{Block, Bytes, TransactionSigned, B256, U128};
use reth_primitives::{Block, Bytes, TransactionSigned, TxHash, B256, U128};

use std::sync::Arc;
use std::{collections::HashMap, mem, sync::Arc};

#[cfg(feature = "arbitrary")]
use proptest::prelude::*;
Expand Down Expand Up @@ -159,6 +159,14 @@ impl NewPooledTransactionHashes {
}
}

/// Returns an immutable reference to transaction hashes.
pub fn hashes(&self) -> &Vec<B256> {
match self {
NewPooledTransactionHashes::Eth66(msg) => &msg.0,
NewPooledTransactionHashes::Eth68(msg) => &msg.hashes,
}
}

/// Returns a mutable reference to transaction hashes.
pub fn hashes_mut(&mut self) -> &mut Vec<B256> {
match self {
Expand Down Expand Up @@ -212,14 +220,45 @@ impl NewPooledTransactionHashes {
}
}

/// Returns an iterator over tx hashes zipped with corresponding eth68 metadata if this is
/// an eth68 message.
/// Returns an immutable reference to the inner type if this an eth68 announcement.
pub fn as_eth68(&self) -> Option<&NewPooledTransactionHashes68> {
match self {
NewPooledTransactionHashes::Eth66(_) => None,
NewPooledTransactionHashes::Eth68(msg) => Some(msg),
}
}

/// Returns a mutable reference to the inner type if this an eth68 announcement.
pub fn as_eth68_mut(&mut self) -> Option<&mut NewPooledTransactionHashes68> {
match self {
NewPooledTransactionHashes::Eth66(_) => None,
NewPooledTransactionHashes::Eth68(msg) => Some(msg),
}
}

/// Returns a mutable reference to the inner type if this an eth66 announcement.
pub fn as_eth66_mut(&mut self) -> Option<&mut NewPooledTransactionHashes66> {
match self {
NewPooledTransactionHashes::Eth66(msg) => Some(msg),
NewPooledTransactionHashes::Eth68(_) => None,
}
}

/// Returns the inner type if this an eth68 announcement.
pub fn take_eth68(&mut self) -> Option<NewPooledTransactionHashes68> {
match self {
NewPooledTransactionHashes::Eth66(_) => None,
NewPooledTransactionHashes::Eth68(msg) => Some(mem::take(msg)),
}
}

/// Returns the inner type if this an eth66 announcement.
pub fn take_eth66(&mut self) -> Option<NewPooledTransactionHashes66> {
match self {
NewPooledTransactionHashes::Eth66(msg) => Some(mem::take(msg)),
NewPooledTransactionHashes::Eth68(_) => None,
}
}
}

impl From<NewPooledTransactionHashes> for EthMessage {
Expand Down Expand Up @@ -401,6 +440,99 @@ impl Decodable for NewPooledTransactionHashes68 {
}
}

/// Interface for handling announcement data in filters in the transaction manager and transaction
/// pool. Note: this trait may disappear when distinction between eth66 and eth68 hashes is more
/// clearly defined, see <https://github.com/paradigmxyz/reth/issues/6148>.
pub trait HandleAnnouncement {
/// The announcement contains no entries.
fn is_empty(&self) -> bool;

/// Retain only entries for which the hash in the entry, satisfies a given predicate.
fn retain_by_hash(&mut self, f: impl FnMut(TxHash) -> bool);
}

impl HandleAnnouncement for NewPooledTransactionHashes {
fn is_empty(&self) -> bool {
self.is_empty()
}

fn retain_by_hash(&mut self, f: impl FnMut(TxHash) -> bool) {
match self {
NewPooledTransactionHashes::Eth66(msg) => msg.retain_by_hash(f),
NewPooledTransactionHashes::Eth68(msg) => msg.retain_by_hash(f),
}
}
}

impl HandleAnnouncement for NewPooledTransactionHashes68 {
fn is_empty(&self) -> bool {
self.hashes.is_empty()
}

fn retain_by_hash(&mut self, mut f: impl FnMut(TxHash) -> bool) {
let mut indices_to_remove = vec![];
for (i, &hash) in self.hashes.iter().enumerate() {
if !f(hash) {
indices_to_remove.push(i);
}
}

for index in indices_to_remove.into_iter().rev() {
self.hashes.remove(index);
self.types.remove(index);
self.sizes.remove(index);
}
}
}

impl HandleAnnouncement for NewPooledTransactionHashes66 {
fn is_empty(&self) -> bool {
self.0.is_empty()
}

fn retain_by_hash(&mut self, mut f: impl FnMut(TxHash) -> bool) {
let mut indices_to_remove = vec![];
for (i, &hash) in self.0.iter().enumerate() {
if !f(hash) {
indices_to_remove.push(i);
}
}

for index in indices_to_remove.into_iter().rev() {
self.0.remove(index);
}
}
}

/// Announcement data that has been validated according to the configured network. For an eth68
/// announcement, values of the map are `Some((u8, usize))` - the tx metadata. For an eth66
/// announcement, values of the map are `None`.
pub type ValidAnnouncementData = HashMap<TxHash, Option<(u8, usize)>>;

impl HandleAnnouncement for ValidAnnouncementData {
fn is_empty(&self) -> bool {
self.is_empty()
}

fn retain_by_hash(&mut self, mut f: impl FnMut(TxHash) -> bool) {
self.retain(|&hash, _| f(hash))
}
}

/// Hashes extracted from valid announcement data. For an eth68 announcement, this means the eth68
/// metadata should have been cached already.
pub type ValidTxHashes = Vec<TxHash>;

impl HandleAnnouncement for ValidTxHashes {
fn is_empty(&self) -> bool {
self.is_empty()
}

fn retain_by_hash(&mut self, mut f: impl FnMut(TxHash) -> bool) {
self.retain(|&hash| f(hash))
}
}

#[cfg(test)]
mod tests {
use super::*;
Expand Down
5 changes: 4 additions & 1 deletion crates/net/network-api/src/reputation.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,8 +10,11 @@ pub enum ReputationChangeKind {
///
/// Note: this will we only used in pre-merge, pow consensus, since after no more block announcements are sent via devp2p: [EIP-3675](https://eips.ethereum.org/EIPS/eip-3675#devp2p)
BadBlock,
/// Peer sent a bad transaction messages. E.g. Transactions which weren't recoverable.
/// Peer sent a bad transaction message. E.g. Transactions which weren't recoverable.
BadTransactions,
/// Peer sent a bad announcement message, e.g. invalid transaction type for the configured
/// network.
BadAnnouncement,
/// Peer sent a message that included a hash or transaction that we already received from the
/// peer.
///
Expand Down
4 changes: 2 additions & 2 deletions crates/net/network/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,8 @@ reth-provider.workspace = true
reth-rpc-types.workspace = true
reth-tokio-util.workspace = true

# ethereum
enr = { workspace = true, features = ["rust-secp256k1"], optional = true }
alloy-rlp.workspace = true

# async/futures
Expand Down Expand Up @@ -66,8 +68,6 @@ secp256k1 = { workspace = true, features = ["global-context", "rand-std", "recov
derive_more.workspace = true
schnellru.workspace = true
itertools.workspace = true

enr = { workspace = true, features = ["rust-secp256k1"], optional = true }
tempfile = { workspace = true, optional = true }

[dev-dependencies]
Expand Down
1 change: 1 addition & 0 deletions crates/net/network/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -147,5 +147,6 @@ pub use session::{
PendingSessionHandle, PendingSessionHandshakeError, SessionCommand, SessionEvent, SessionId,
SessionLimits, SessionManager, SessionsConfig,
};
pub use transactions::{AnnouncementFilter, FilterAnnouncement, ValidateTx68};

pub use reth_eth_wire::{DisconnectReason, HelloMessageWithProtocols};
8 changes: 8 additions & 0 deletions crates/net/network/src/peers/reputation.rs
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,10 @@ const ALREADY_SEEN_TRANSACTION_REPUTATION_CHANGE: i32 = 0;
/// The reputation change to apply to a peer which violates protocol rules: minimal reputation
const BAD_PROTOCOL_REPUTATION_CHANGE: i32 = i32::MIN;

/// The reputation change to apply to a peer that sent a bad announcement.
// todo: current value is a hint, needs to be set properly
const BAD_ANNOUNCEMENT_REPUTATION_CHANGE: i32 = REPUTATION_UNIT;

/// Returns `true` if the given reputation is below the [`BANNED_REPUTATION`] threshold
#[inline]
pub(crate) fn is_banned_reputation(reputation: i32) -> bool {
Expand All @@ -59,6 +63,8 @@ pub struct ReputationChangeWeights {
pub failed_to_connect: Reputation,
/// Weight for [`ReputationChangeKind::Dropped`]
pub dropped: Reputation,
/// Weight for [`ReputationChangeKind::BadAnnouncement`]
pub bad_announcement: Reputation,
}

// === impl ReputationChangeWeights ===
Expand All @@ -78,6 +84,7 @@ impl ReputationChangeWeights {
ReputationChangeKind::Dropped => self.dropped.into(),
ReputationChangeKind::Reset => DEFAULT_REPUTATION.into(),
ReputationChangeKind::Other(val) => val.into(),
ReputationChangeKind::BadAnnouncement => self.bad_announcement.into(),
}
}
}
Expand All @@ -93,6 +100,7 @@ impl Default for ReputationChangeWeights {
bad_protocol: BAD_PROTOCOL_REPUTATION_CHANGE,
failed_to_connect: FAILED_TO_CONNECT_REPUTATION_CHANGE,
dropped: REMOTE_DISCONNECT_REPUTATION_CHANGE,
bad_announcement: BAD_ANNOUNCEMENT_REPUTATION_CHANGE,
}
}
}
Expand Down
25 changes: 15 additions & 10 deletions crates/net/network/src/transactions/fetcher.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ use crate::{
};
use futures::{stream::FuturesUnordered, Future, FutureExt, Stream, StreamExt};
use pin_project::pin_project;
use reth_eth_wire::{EthVersion, GetPooledTransactions};
use reth_eth_wire::{EthVersion, GetPooledTransactions, HandleAnnouncement};
use reth_interfaces::p2p::error::{RequestError, RequestResult};
use reth_primitives::{PeerId, PooledTransactionsElement, TxHash};
use schnellru::{ByLength, Unlimited};
Expand All @@ -16,7 +16,9 @@ use std::{
use tokio::sync::{mpsc::error::TrySendError, oneshot, oneshot::error::RecvError};
use tracing::{debug, trace};

use super::{Peer, PooledTransactions, POOLED_TRANSACTIONS_RESPONSE_SOFT_LIMIT_BYTE_SIZE};
use super::{
AnnouncementFilter, Peer, PooledTransactions, POOLED_TRANSACTIONS_RESPONSE_SOFT_LIMIT_BYTE_SIZE,
};

/// Maximum concurrent [`GetPooledTxRequest`]s to allow per peer.
pub(super) const MAX_CONCURRENT_TX_REQUESTS_PER_PEER: u8 = 1;
Expand Down Expand Up @@ -65,6 +67,8 @@ pub(super) struct TransactionFetcher {
pub(super) unknown_hashes: LruMap<TxHash, (u8, LruCache<PeerId>), Unlimited>,
/// Size metadata for unknown eth68 hashes.
pub(super) eth68_meta: LruMap<TxHash, usize, Unlimited>,
/// Filter for valid eth68 announcements.
pub(super) filter_valid_hashes: AnnouncementFilter,
}

// === impl TransactionFetcher ===
Expand Down Expand Up @@ -313,18 +317,18 @@ impl TransactionFetcher {
self.remove_from_unknown_hashes(hashes)
}

pub(super) fn filter_unseen_hashes(
pub(super) fn filter_unseen_hashes<T: HandleAnnouncement>(
&mut self,
new_announced_hashes: &mut Vec<TxHash>,
new_announced_hashes: &mut T,
peer_id: PeerId,
is_session_active: impl Fn(PeerId) -> bool,
) {
// filter out inflight hashes, and register the peer as fallback for all inflight hashes
new_announced_hashes.retain(|hash| {
new_announced_hashes.retain_by_hash(|hash| {
// occupied entry
if let Some((_retries, ref mut backups)) = self.unknown_hashes.peek_mut(hash) {
if let Some((_retries, ref mut backups)) = self.unknown_hashes.peek_mut(&hash) {
// hash has been seen but is not inflight
if self.buffered_hashes.remove(hash) {
if self.buffered_hashes.remove(&hash) {
return true
}
// hash has been seen and is in flight. store peer as fallback peer.
Expand All @@ -340,13 +344,13 @@ impl TransactionFetcher {
for peer_id in ended_sessions {
backups.remove(&peer_id);
}
backups.insert(peer_id);

return false
}

let msg_version = || self.eth68_meta.peek(hash).map(|_| EthVersion::Eth68).unwrap_or(EthVersion::Eth66);

// vacant entry
let msg_version = || self.eth68_meta.peek(&hash).map(|_| EthVersion::Eth68).unwrap_or(EthVersion::Eth66);

trace!(target: "net::tx",
peer_id=format!("{peer_id:#}"),
hash=%hash,
Expand Down Expand Up @@ -722,6 +726,7 @@ impl Default for TransactionFetcher {
),
unknown_hashes: LruMap::new_unlimited(),
eth68_meta: LruMap::new_unlimited(),
filter_valid_hashes: Default::default(),
}
}
}
Expand Down
Loading

0 comments on commit 34cda3a

Please sign in to comment.