Skip to content

Commit a6e3ccf

Browse files
rodrigo-oSDartayet
authored andcommitted
chore(l1): optimize the structure for tracking known txs sent to peers in TxBroadcaster (#4675)
**Motivation** We were seeing spikes of 3~10Gb on post-sync execution, most of it was due to `resize_rehash` during the `add_txs` of the `TxBroadcaster`. **Description** This PR replace the current hashmap of `(tx_hash, peer_id) -> Instant` for a less duplicated `tx_hash -> (peer_bitmap, last_seen)` + another `peer_id -> peer_index`. This removes orders of magnitud of entries, going from 27M to less than 100k. Now all the memory taken from the TxBroadcaster disappeared from our memory flamegraphs. WIP: ~I'm right now analyzing the execution of the client throughout the weekend, we still appear to have a memory issue.~ This was related to a change in rocksdb that enabled transaction. It didn't affected this fix. <!-- Link to issues: Resolves #111, Resolves #222 --> Closes #issue_number
1 parent 4d257ee commit a6e3ccf

File tree

1 file changed

+109
-15
lines changed

1 file changed

+109
-15
lines changed

crates/networking/p2p/tx_broadcaster.rs

Lines changed: 109 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -28,20 +28,75 @@ use crate::{
2828
// Soft limit for the number of transaction hashes sent in a single NewPooledTransactionHashes message as per [the spec](https://github.com/ethereum/devp2p/blob/master/caps/eth.md#newpooledtransactionhashes-0x080)
2929
const NEW_POOLED_TRANSACTION_HASHES_SOFT_LIMIT: usize = 4096;
3030

31-
// Amount of seconds after which we prune old entries from broadcasted_txs_per_peer (We should fine tune this)
32-
const PRUNE_WAIT_TIME_SECS: u64 = 300; // 5 minutes
31+
// Amount of seconds after which we prune broadcast records (We should fine tune this)
32+
const PRUNE_WAIT_TIME_SECS: u64 = 600; // 10 minutes
3333

3434
// Amount of seconds between each prune
35-
const PRUNE_INTERVAL_SECS: u64 = 300; // 5 minutes
35+
const PRUNE_INTERVAL_SECS: u64 = 360; // 6 minutes
3636

3737
// Amount of seconds between each broadcast
3838
const BROADCAST_INTERVAL_SECS: u64 = 1; // 1 second
3939

40+
#[derive(Debug, Clone, Default)]
41+
struct PeerMask {
42+
bits: Vec<u64>,
43+
}
44+
45+
impl PeerMask {
46+
#[inline]
47+
// Ensure that the internal bit vector can hold the given index
48+
// If not, resize the vector.
49+
fn ensure(&mut self, idx: u32) {
50+
let word = (idx as usize) / 64;
51+
if self.bits.len() <= word {
52+
self.bits.resize(word + 1, 0);
53+
}
54+
}
55+
56+
#[inline]
57+
fn is_set(&self, idx: u32) -> bool {
58+
let word = (idx as usize) / 64;
59+
if word >= self.bits.len() {
60+
return false;
61+
}
62+
let bit = (idx as usize) % 64;
63+
(self.bits[word] >> bit) & 1 == 1
64+
}
65+
66+
#[inline]
67+
fn set(&mut self, idx: u32) {
68+
self.ensure(idx);
69+
let word = (idx as usize) / 64;
70+
let bit = (idx as usize) % 64;
71+
self.bits[word] |= 1u64 << bit;
72+
}
73+
}
74+
75+
#[derive(Debug, Clone)]
76+
struct BroadcastRecord {
77+
peers: PeerMask,
78+
last_sent: Instant,
79+
}
80+
81+
impl Default for BroadcastRecord {
82+
fn default() -> Self {
83+
Self {
84+
peers: PeerMask::default(),
85+
last_sent: Instant::now(),
86+
}
87+
}
88+
}
89+
4090
#[derive(Debug, Clone)]
4191
pub struct TxBroadcaster {
4292
kademlia: Kademlia,
4393
blockchain: Arc<Blockchain>,
44-
broadcasted_txs_per_peer: HashMap<(H256, H256), Instant>, // (peer_id,tx_hash) -> timestamp
94+
// tx_hash -> broadcast record (which peers know it and when it was last sent)
95+
known_txs: HashMap<H256, BroadcastRecord>,
96+
// Assign each peer_id (H256) a u32 index used by PeerMask entries
97+
peer_indexer: HashMap<H256, u32>,
98+
// Next index to assign to a new peer
99+
next_peer_idx: u32,
45100
}
46101

47102
#[derive(Debug, Clone)]
@@ -66,7 +121,9 @@ impl TxBroadcaster {
66121
let state = TxBroadcaster {
67122
kademlia,
68123
blockchain,
69-
broadcasted_txs_per_peer: HashMap::new(),
124+
known_txs: HashMap::new(),
125+
peer_indexer: HashMap::new(),
126+
next_peer_idx: 0,
70127
};
71128

72129
let server = state.clone().start();
@@ -86,12 +143,36 @@ impl TxBroadcaster {
86143
Ok(server)
87144
}
88145

146+
// Get or assign a unique index to the peer_id
147+
#[inline]
148+
fn peer_index(&mut self, peer_id: H256) -> u32 {
149+
if let Some(&idx) = self.peer_indexer.get(&peer_id) {
150+
idx
151+
} else {
152+
// We are assigning indexes sequentially, so next_peer_idx is always the next available one.
153+
// self.peer_indexer.len() could be used instead of next_peer_idx but avoided here if we ever
154+
// remove entries from peer_indexer in the future.
155+
let idx = self.next_peer_idx;
156+
// In practice we won't exceed u32::MAX (~4.29 Billion) peers.
157+
self.next_peer_idx += 1;
158+
self.peer_indexer.insert(peer_id, idx);
159+
idx
160+
}
161+
}
162+
89163
fn add_txs(&mut self, txs: Vec<H256>, peer_id: H256) {
164+
debug!(total = self.known_txs.len(), adding = txs.len(), peer_id = %format!("{:#x}", peer_id), "Adding transactions to known list");
165+
166+
if txs.is_empty() {
167+
return;
168+
}
169+
90170
let now = Instant::now();
171+
let peer_idx = self.peer_index(peer_id);
91172
for tx in txs {
92-
self.broadcasted_txs_per_peer
93-
.entry((peer_id, tx))
94-
.insert_entry(now);
173+
let record = self.known_txs.entry(tx).or_default();
174+
record.peers.set(peer_idx);
175+
record.last_sent = now;
95176
}
96177
}
97178

@@ -127,12 +208,15 @@ impl TxBroadcaster {
127208
shuffled_peers.split_at(peer_sqrt.ceil() as usize);
128209

129210
for (peer_id, mut peer_channels, capabilities) in peers_to_send_full_txs.iter().cloned() {
211+
let peer_idx = self.peer_index(peer_id);
130212
let txs_to_send = full_txs
131213
.iter()
132214
.filter(|tx| {
215+
let hash = tx.hash();
133216
!self
134-
.broadcasted_txs_per_peer
135-
.contains_key(&(peer_id, tx.hash()))
217+
.known_txs
218+
.get(&hash)
219+
.is_some_and(|record| record.peers.is_set(peer_idx))
136220
})
137221
.cloned()
138222
.collect::<Vec<Transaction>>();
@@ -170,12 +254,15 @@ impl TxBroadcaster {
170254
peer_channels: &mut PeerChannels,
171255
peer_id: H256,
172256
) -> Result<(), TxBroadcasterError> {
257+
let peer_idx = self.peer_index(peer_id);
173258
let txs_to_send = txs
174259
.iter()
175260
.filter(|tx| {
261+
let hash = tx.hash();
176262
!self
177-
.broadcasted_txs_per_peer
178-
.contains_key(&(peer_id, tx.hash()))
263+
.known_txs
264+
.get(&hash)
265+
.is_some_and(|record| record.peers.is_set(peer_idx))
179266
})
180267
.cloned()
181268
.collect::<Vec<MempoolTransaction>>();
@@ -250,9 +337,16 @@ impl GenServer for TxBroadcaster {
250337
Self::CastMsg::PruneTxs => {
251338
debug!(received = "PruneTxs");
252339
let now = Instant::now();
253-
self.broadcasted_txs_per_peer.retain(|_, &mut timestamp| {
254-
now.duration_since(timestamp) < Duration::from_secs(PRUNE_WAIT_TIME_SECS)
255-
});
340+
let before = self.known_txs.len();
341+
let prune_window = Duration::from_secs(PRUNE_WAIT_TIME_SECS);
342+
343+
self.known_txs
344+
.retain(|_, record| now.duration_since(record.last_sent) < prune_window);
345+
debug!(
346+
before = before,
347+
after = self.known_txs.len(),
348+
"Pruned old broadcasted transactions"
349+
);
256350
CastResponse::NoReply
257351
}
258352
}

0 commit comments

Comments
 (0)