diff --git a/Cargo.lock b/Cargo.lock index 48b09c6dc44..ebe7cbf3de5 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2262,15 +2262,6 @@ dependencies = [ "syn 1.0.109", ] -[[package]] -name = "lru" -version = "0.10.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "03f1160296536f10c833a82dca22267d5486734230d47bf00bf435885814ba1e" -dependencies = [ - "hashbrown 0.13.2", -] - [[package]] name = "lz4-sys" version = "1.9.4" @@ -2964,7 +2955,6 @@ name = "massa_protocol_worker" version = "0.1.0" dependencies = [ "crossbeam", - "lru", "massa_consensus_exports", "massa_hash 0.1.0", "massa_logging", @@ -2982,6 +2972,7 @@ dependencies = [ "peernet", "rand 0.8.5", "rayon", + "schnellru", "serde_json", "serial_test 2.0.0", "tempfile", diff --git a/massa-protocol-worker/Cargo.toml b/massa-protocol-worker/Cargo.toml index e6a0a9523b2..abfe52514ab 100644 --- a/massa-protocol-worker/Cargo.toml +++ b/massa-protocol-worker/Cargo.toml @@ -15,7 +15,7 @@ num_enum = "0.5" peernet = { git = "https://github.com/massalabs/PeerNet", branch = "generic_peer_id" } tempfile = { version = "3.3", optional = true } # use with testing feature rayon = "1.7.0" -lru = "0.10.0" +schnellru = "0.2.1" # modules Custom massa_hash = { path = "../massa-hash" } diff --git a/massa-protocol-worker/src/connectivity.rs b/massa-protocol-worker/src/connectivity.rs index ed977a3e57c..9dacd094958 100644 --- a/massa-protocol-worker/src/connectivity.rs +++ b/massa-protocol-worker/src/connectivity.rs @@ -12,8 +12,8 @@ use parking_lot::RwLock; use peernet::transports::TcpOutConnectionConfig; use peernet::{peer::PeerConnectionType, transports::OutConnectionConfig}; use std::net::SocketAddr; +use std::sync::Arc; use std::{collections::HashMap, net::IpAddr}; -use std::{num::NonZeroUsize, sync::Arc}; use std::{thread::JoinHandle, time::Duration}; use tracing::{info, warn}; @@ -87,17 +87,17 @@ pub(crate) fn start_connectivity_thread( let total_in_slots = config.peers_categories.values().map(|v| v.max_in_connections_post_handshake).sum::() + config.default_category_info.max_in_connections_post_handshake; let total_out_slots = config.peers_categories.values().map(| v| v.target_out_connections).sum::() + config.default_category_info.target_out_connections; let operation_cache = Arc::new(RwLock::new(OperationCache::new( - NonZeroUsize::new(config.max_known_ops_size).unwrap(), - NonZeroUsize::new(total_in_slots + total_out_slots).unwrap(), + config.max_known_blocks_size.try_into().unwrap(), + (total_in_slots + total_out_slots).try_into().unwrap(), ))); let endorsement_cache = Arc::new(RwLock::new(EndorsementCache::new( - NonZeroUsize::new(config.max_known_endorsements_size).unwrap(), - NonZeroUsize::new(total_in_slots + total_out_slots).unwrap(), + config.max_known_endorsements_size.try_into().unwrap(), + (total_in_slots + total_out_slots).try_into().unwrap() ))); let block_cache = Arc::new(RwLock::new(BlockCache::new( - NonZeroUsize::new(config.max_known_blocks_size).unwrap(), - NonZeroUsize::new(total_in_slots + total_out_slots).unwrap(), + config.max_known_blocks_size.try_into().unwrap(), + (total_in_slots + total_out_slots).try_into().unwrap(), ))); // Start handlers diff --git a/massa-protocol-worker/src/handlers/block_handler/cache.rs b/massa-protocol-worker/src/handlers/block_handler/cache.rs index b78a7e88a85..43a860e611e 100644 --- a/massa-protocol-worker/src/handlers/block_handler/cache.rs +++ b/massa-protocol-worker/src/handlers/block_handler/cache.rs @@ -1,15 +1,16 @@ -use std::{num::NonZeroUsize, sync::Arc, time::Instant}; +use std::{sync::Arc, time::Instant}; -use lru::LruCache; use massa_models::{block_header::SecuredHeader, block_id::BlockId}; use massa_protocol_exports::PeerId; use parking_lot::RwLock; +use schnellru::{ByLength, LruMap}; +use tracing::log::warn; pub struct BlockCache { - pub checked_headers: LruCache, + pub checked_headers: LruMap, #[allow(clippy::type_complexity)] - pub blocks_known_by_peer: LruCache, Instant)>, - pub max_known_blocks_by_peer: NonZeroUsize, + pub blocks_known_by_peer: LruMap, Instant)>, + pub max_known_blocks_by_peer: u32, } impl BlockCache { @@ -20,22 +21,29 @@ impl BlockCache { val: bool, timeout: Instant, ) { - let (blocks, _) = self + let Ok((blocks, _)) = self .blocks_known_by_peer - .get_or_insert_mut(from_peer_id.clone(), || { - (LruCache::new(self.max_known_blocks_by_peer), Instant::now()) - }); + .get_or_insert(from_peer_id.clone(), || { + ( + LruMap::new(ByLength::new(self.max_known_blocks_by_peer)), + Instant::now(), + ) + }) + .ok_or(()) else { + warn!("blocks_known_by_peer limit reached"); + return; + }; for block_id in block_ids { - blocks.put(*block_id, (val, timeout)); + blocks.insert(*block_id, (val, timeout)); } } } impl BlockCache { - pub fn new(max_known_blocks: NonZeroUsize, max_known_blocks_by_peer: NonZeroUsize) -> Self { + pub fn new(max_known_blocks: u32, max_known_blocks_by_peer: u32) -> Self { Self { - checked_headers: LruCache::new(max_known_blocks), - blocks_known_by_peer: LruCache::new(max_known_blocks_by_peer), + checked_headers: LruMap::new(ByLength::new(max_known_blocks)), + blocks_known_by_peer: LruMap::new(ByLength::new(max_known_blocks_by_peer)), max_known_blocks_by_peer, } } diff --git a/massa-protocol-worker/src/handlers/block_handler/propagation.rs b/massa-protocol-worker/src/handlers/block_handler/propagation.rs index 26984cdc9b3..8ecc0ba99c8 100644 --- a/massa-protocol-worker/src/handlers/block_handler/propagation.rs +++ b/massa-protocol-worker/src/handlers/block_handler/propagation.rs @@ -1,12 +1,12 @@ -use std::{collections::VecDeque, num::NonZeroUsize, thread::JoinHandle, time::Instant}; +use std::{collections::VecDeque, thread::JoinHandle, time::Instant}; use crossbeam::channel::{Receiver, Sender}; -use lru::LruCache; use massa_logging::massa_trace; use massa_models::{block_id::BlockId, prehash::PreHashSet}; use massa_protocol_exports::PeerId; use massa_protocol_exports::{ProtocolConfig, ProtocolError}; use massa_storage::Storage; +use schnellru::{ByLength, LruMap}; use tracing::{debug, info, warn}; use crate::{ @@ -77,18 +77,18 @@ impl PropagationThread { self.active_connections.get_peer_ids_connected(); for peer_id in peers { if !peers_connected.contains(&peer_id) { - cache_write.blocks_known_by_peer.pop(&peer_id); + cache_write.blocks_known_by_peer.remove(&peer_id); } } for peer_id in peers_connected { - if !cache_write.blocks_known_by_peer.contains(&peer_id) { + if cache_write.blocks_known_by_peer.peek(&peer_id).is_none() { //TODO: Change to detect the connection before - cache_write.blocks_known_by_peer.put( + cache_write.blocks_known_by_peer.insert( peer_id, ( - LruCache::new( - NonZeroUsize::new(self.config.max_node_known_blocks_size) - .expect("max_node_known_blocks_size in config must be > 0"), + LruMap::new( + ByLength::new(self.config.max_node_known_blocks_size.try_into() + .expect("max_node_known_blocks_size in config must be > 0")), ), Instant::now(), ), @@ -98,7 +98,8 @@ impl PropagationThread { } { let cache_read = self.cache.read(); - for (peer_id, (blocks_known, _)) in &cache_read.blocks_known_by_peer + for (peer_id, (blocks_known, _)) in + cache_read.blocks_known_by_peer.iter() { // peer that isn't asking for that block let cond = blocks_known.peek(&block_id); diff --git a/massa-protocol-worker/src/handlers/block_handler/retrieval.rs b/massa-protocol-worker/src/handlers/block_handler/retrieval.rs index be862a8691d..8a7e02180ea 100644 --- a/massa-protocol-worker/src/handlers/block_handler/retrieval.rs +++ b/massa-protocol-worker/src/handlers/block_handler/retrieval.rs @@ -1,6 +1,5 @@ use std::{ collections::{hash_map::Entry, HashMap, HashSet}, - num::NonZeroUsize, thread::JoinHandle, time::Instant, }; @@ -21,7 +20,6 @@ use crossbeam::{ channel::{at, Receiver, Sender}, select, }; -use lru::LruCache; use massa_consensus_exports::ConsensusController; use massa_hash::{Hash, HASH_SIZE_BYTES}; use massa_logging::massa_trace; @@ -42,6 +40,7 @@ use massa_serialization::{DeserializeError, Deserializer, Serializer}; use massa_storage::Storage; use massa_time::TimeError; use massa_versioning::versioning::MipStore; +use schnellru::{ByLength, LruMap}; use tracing::{debug, info, warn}; use super::{ @@ -309,7 +308,7 @@ impl RetrievalThread { let connected_peers = self.active_connections.get_peer_ids_connected(); for peer_id in peers { if !connected_peers.contains(&peer_id) { - cache_write.blocks_known_by_peer.pop(&peer_id); + cache_write.blocks_known_by_peer.remove(&peer_id); self.asked_blocks.remove(&peer_id); } } @@ -499,18 +498,24 @@ impl RetrievalThread { true, Instant::now(), ); - { + 'write_cache: { let mut endorsement_cache_write = self.endorsement_cache.write(); - let endorsement_ids = endorsement_cache_write + let Ok(endorsement_ids) = endorsement_cache_write .endorsements_known_by_peer - .get_or_insert_mut(from_peer_id.clone(), || { - LruCache::new( - NonZeroUsize::new(self.config.max_node_known_blocks_size) + .get_or_insert(from_peer_id.clone(), || { + LruMap::new(ByLength::new( + self.config + .max_node_known_blocks_size + .try_into() .expect("max_node_known_blocks_size in config must be > 0"), - ) - }); + )) + }) + .ok_or(()) else { + warn!("endorsements known by peer limit reached"); + break 'write_cache; + }; for endorsement_id in block_header.content.endorsements.iter().map(|e| e.id) { - endorsement_ids.put(endorsement_id, ()); + endorsement_ids.insert(endorsement_id, ()); } } return Ok(Some((block_id, false))); @@ -557,7 +562,7 @@ impl RetrievalThread { } { let mut cache_write = self.cache.write(); - cache_write.checked_headers.put(block_id, header.clone()); + cache_write.checked_headers.insert(block_id, header.clone()); cache_write.insert_blocks_known(from_peer_id, &[block_id], true, Instant::now()); cache_write.insert_blocks_known( from_peer_id, @@ -565,18 +570,24 @@ impl RetrievalThread { true, Instant::now(), ); - { + 'write_cache: { let mut endorsement_cache_write = self.endorsement_cache.write(); - let endorsement_ids = endorsement_cache_write + let Ok(endorsement_ids) = endorsement_cache_write .endorsements_known_by_peer - .get_or_insert_mut(from_peer_id.clone(), || { - LruCache::new( - NonZeroUsize::new(self.config.max_node_known_blocks_size) + .get_or_insert(from_peer_id.clone(), || { + LruMap::new(ByLength::new( + self.config + .max_node_known_blocks_size + .try_into() .expect("max_node_known_blocks_size in config must be > 0"), - ) - }); + )) + }) + .ok_or(()) else { + warn!("endorsements_known_by_peer limit reached"); + break 'write_cache; + }; for endorsement_id in header.content.endorsements.iter().map(|e| e.id) { - endorsement_ids.put(endorsement_id, ()); + endorsement_ids.insert(endorsement_id, ()); } } } @@ -626,7 +637,11 @@ impl RetrievalThread { // check endorsement signature if not already checked { let read_cache = self.endorsement_cache.read(); - if !read_cache.checked_endorsements.contains(&endorsement_id) { + if read_cache + .checked_endorsements + .peek(&endorsement_id) + .is_none() + { new_endorsements.insert(endorsement_id, endorsement); } } @@ -647,24 +662,29 @@ impl RetrievalThread { .collect::>(), )?; - { + 'write_cache: { let mut cache_write = self.endorsement_cache.write(); // add to verified signature cache for endorsement_id in endorsement_ids.iter() { - cache_write.checked_endorsements.put(*endorsement_id, ()); + cache_write.checked_endorsements.insert(*endorsement_id, ()); } // add to known endorsements for source node. - let endorsements = cache_write.endorsements_known_by_peer.get_or_insert_mut( - from_peer_id.clone(), - || { - LruCache::new( - NonZeroUsize::new(self.config.max_node_known_endorsements_size) + let Ok(endorsements) = cache_write + .endorsements_known_by_peer + .get_or_insert(from_peer_id.clone(), || { + LruMap::new(ByLength::new( + self.config + .max_node_known_endorsements_size + .try_into() .expect("max_node_known_endorsements_size in config should be > 0"), - ) - }, - ); + )) + }) + .ok_or(()) else { + warn!("endorsements_known_by_peer limit reached"); + break 'write_cache; + }; for endorsement_id in endorsement_ids.iter() { - endorsements.put(*endorsement_id, ()); + endorsements.insert(*endorsement_id, ()); } } @@ -704,19 +724,24 @@ impl RetrievalThread { let operation_ids_set: PreHashSet = operation_ids.iter().cloned().collect(); // add to known ops - { + 'write_cache: { let mut cache_write = self.operation_cache.write(); - let known_ops = - cache_write - .ops_known_by_peer - .get_or_insert_mut(from_peer_id.clone(), || { - LruCache::new( - NonZeroUsize::new(self.config.max_node_known_ops_size) - .expect("max_node_known_ops_size in config should be > 0"), - ) - }); + let Ok(known_ops) = cache_write + .ops_known_by_peer + .get_or_insert(from_peer_id.clone(), || { + LruMap::new(ByLength::new( + self.config + .max_node_known_ops_size + .try_into() + .expect("max_node_known_ops_size in config should be > 0"), + )) + }) + .ok_or(()) else { + warn!("ops_known_by_peer limitation reached"); + break 'write_cache; + }; for op_id in operation_ids_set.iter() { - known_ops.put(op_id.prefix(), ()); + known_ops.insert(op_id.prefix(), ()); } } let info = if let Some(info) = self.block_wishlist.get_mut(&block_id) { @@ -1014,11 +1039,12 @@ impl RetrievalThread { }; // Check operation signature only if not already checked. - if !self + if self .operation_cache .read() .checked_operations - .contains(&operation_id) + .peek(&operation_id) + .is_none() { // check signature if the operation wasn't in `checked_operation` new_operations.insert(operation_id, operation); @@ -1093,7 +1119,7 @@ impl RetrievalThread { .collect(); for peer_id in peers_in_cache { if !peers_connected.contains(&peer_id) { - cache_write.blocks_known_by_peer.pop(&peer_id); + cache_write.blocks_known_by_peer.remove(&peer_id); } } let peers_in_asked_blocks: Vec = @@ -1105,20 +1131,23 @@ impl RetrievalThread { } // Add new peers for peer_id in peers_connected { - if !cache_write.blocks_known_by_peer.contains(&peer_id) { + if cache_write.blocks_known_by_peer.get(&peer_id).is_none() { //TODO: Change to detect the connection before - cache_write.blocks_known_by_peer.put( + cache_write.blocks_known_by_peer.insert( peer_id.clone(), ( - LruCache::new( - NonZeroUsize::new(self.config.max_node_known_blocks_size) + LruMap::new(ByLength::new( + self.config + .max_node_known_blocks_size + .try_into() .expect("max_node_known_blocks_size in config must be > 0"), - ), + )), Instant::now(), ), ); } else { - cache_write.blocks_known_by_peer.promote(&peer_id); + // Promote peer_id as the newest used key + cache_write.blocks_known_by_peer.get(&peer_id); } if !self.asked_blocks.contains_key(&peer_id) { @@ -1126,7 +1155,16 @@ impl RetrievalThread { .insert(peer_id.clone(), PreHashMap::default()); } } - for (peer_id, (blocks_known, _)) in cache_write.blocks_known_by_peer.iter_mut() { + let all_keys: Vec = cache_write + .blocks_known_by_peer + .iter() + .map(|(k, _)| k) + .cloned() + .collect(); + for peer_id in all_keys.iter() { + // for (peer_id, (blocks_known, _)) in cache_write.blocks_known_by_peer.iter() { + let (blocks_known, _) = + cache_write.blocks_known_by_peer.peek_mut(peer_id).unwrap(); // map to remove the borrow on asked_blocks. Otherwise can't call insert_known_blocks let ask_time_opt = self .asked_blocks @@ -1167,10 +1205,10 @@ impl RetrievalThread { continue; // not a candidate } // timed out, supposed to have it - (true, Some(timeout_at), Some((true, info_time))) => { - if info_time < &timeout_at { + (true, Some(mut timeout_at), Some((true, info_time))) => { + if info_time < &mut timeout_at { // info less recent than timeout: mark as not having it - blocks_known.put(*hash, (false, timeout_at)); + blocks_known.insert(*hash, (false, timeout_at)); (2u8, ask_time_opt) } else { // told us it has it after a timeout: good candidate again @@ -1178,16 +1216,16 @@ impl RetrievalThread { } } // timed out, supposed to not have it - (true, Some(timeout_at), Some((false, info_time))) => { - if info_time < &timeout_at { + (true, Some(mut timeout_at), Some((false, info_time))) => { + if info_time < &mut timeout_at { // info less recent than timeout: update info time - blocks_known.put(*hash, (false, timeout_at)); + blocks_known.insert(*hash, (false, timeout_at)); } (2u8, ask_time_opt) } // timed out but don't know if has it: mark as not having it (true, Some(timeout_at), None) => { - blocks_known.put(*hash, (false, timeout_at)); + blocks_known.insert(*hash, (false, timeout_at)); (2u8, ask_time_opt) } }; diff --git a/massa-protocol-worker/src/handlers/endorsement_handler/cache.rs b/massa-protocol-worker/src/handlers/endorsement_handler/cache.rs index 310e3619a63..86ac7e43748 100644 --- a/massa-protocol-worker/src/handlers/endorsement_handler/cache.rs +++ b/massa-protocol-worker/src/handlers/endorsement_handler/cache.rs @@ -1,23 +1,20 @@ -use std::{num::NonZeroUsize, sync::Arc}; +use std::sync::Arc; -use lru::LruCache; use massa_models::endorsement::EndorsementId; use massa_protocol_exports::PeerId; use parking_lot::RwLock; +use schnellru::{ByLength, LruMap}; pub struct EndorsementCache { - pub checked_endorsements: LruCache, - pub endorsements_known_by_peer: LruCache>, + pub checked_endorsements: LruMap, + pub endorsements_known_by_peer: LruMap>, } impl EndorsementCache { - pub fn new( - max_known_endorsements: NonZeroUsize, - max_known_endorsements_by_peer: NonZeroUsize, - ) -> Self { + pub fn new(max_known_endorsements: u32, max_known_endorsements_by_peer: u32) -> Self { Self { - checked_endorsements: LruCache::new(max_known_endorsements), - endorsements_known_by_peer: LruCache::new(max_known_endorsements_by_peer), + checked_endorsements: LruMap::new(ByLength::new(max_known_endorsements)), + endorsements_known_by_peer: LruMap::new(ByLength::new(max_known_endorsements_by_peer)), } } } diff --git a/massa-protocol-worker/src/handlers/endorsement_handler/propagation.rs b/massa-protocol-worker/src/handlers/endorsement_handler/propagation.rs index a431dfbda1c..f97327fccaf 100644 --- a/massa-protocol-worker/src/handlers/endorsement_handler/propagation.rs +++ b/massa-protocol-worker/src/handlers/endorsement_handler/propagation.rs @@ -1,13 +1,13 @@ -use std::{num::NonZeroUsize, thread::JoinHandle}; +use std::thread::JoinHandle; use crossbeam::channel::Receiver; -use lru::LruCache; use massa_models::{ endorsement::{EndorsementId, SecureShareEndorsement}, prehash::{PreHashMap, PreHashSet}, }; use massa_protocol_exports::PeerId; use massa_protocol_exports::ProtocolConfig; +use schnellru::{ByLength, LruMap}; use tracing::{debug, info, log::warn}; use crate::{messages::MessagesSerializer, wrap_network::ActiveConnectionsTrait}; @@ -57,22 +57,25 @@ impl PropagationThread { { let mut cache_write = self.cache.write(); for endorsement_id in endorsements_ids.iter().copied() { - cache_write.checked_endorsements.put(endorsement_id, ()); + cache_write.checked_endorsements.insert(endorsement_id, ()); } // Add peers that potentially don't exist in cache let peer_connected = self.active_connections.get_peer_ids_connected(); for peer_id in &peer_connected { - if !cache_write.endorsements_known_by_peer.contains(peer_id) { - cache_write.endorsements_known_by_peer.put( + if cache_write + .endorsements_known_by_peer + .get(peer_id) + .is_none() + { + cache_write.endorsements_known_by_peer.insert( peer_id.clone(), - LruCache::new( - NonZeroUsize::new( - self.config.max_node_known_endorsements_size, - ) + LruMap::new( + ByLength::new( + self.config.max_node_known_endorsements_size.try_into() .expect( "max_node_known_endorsements_size in config is > 0", - ), + )), ), ); } @@ -85,12 +88,23 @@ impl PropagationThread { // Clean shared cache if peers do not exist anymore for peer_id in peers { if !peer_connected.contains(&peer_id) { - cache_write.endorsements_known_by_peer.pop(&peer_id); + cache_write.endorsements_known_by_peer.remove(&peer_id); } } - for (peer_id, endorsement_ids) in - cache_write.endorsements_known_by_peer.iter_mut() - { + let all_keys: Vec = cache_write + .endorsements_known_by_peer + .iter() + .map(|(k, _)| k) + .cloned() + .collect(); + for peer_id in all_keys.iter() { + // for (peer_id, endorsement_ids) in + // cache_write.endorsements_known_by_peer.iter() + // { + let endorsement_ids = cache_write + .endorsements_known_by_peer + .peek_mut(peer_id) + .unwrap(); let new_endorsements: PreHashMap< EndorsementId, SecureShareEndorsement, @@ -100,7 +114,7 @@ impl PropagationThread { .get_endorsement_refs() .iter() .filter_map(|id| { - if endorsement_ids.contains(id) { + if endorsement_ids.peek(id).is_some() { return None; } Some(( @@ -111,7 +125,7 @@ impl PropagationThread { .collect() }; for endorsement_id in new_endorsements.keys().copied() { - endorsement_ids.put(endorsement_id, ()); + endorsement_ids.insert(endorsement_id, ()); } let to_send = new_endorsements.into_values().collect::>(); diff --git a/massa-protocol-worker/src/handlers/endorsement_handler/retrieval.rs b/massa-protocol-worker/src/handlers/endorsement_handler/retrieval.rs index bd4d46046ad..fca8c1a23e3 100644 --- a/massa-protocol-worker/src/handlers/endorsement_handler/retrieval.rs +++ b/massa-protocol-worker/src/handlers/endorsement_handler/retrieval.rs @@ -1,10 +1,9 @@ -use std::{num::NonZeroUsize, thread::JoinHandle}; +use std::thread::JoinHandle; use crossbeam::{ channel::{Receiver, Sender}, select, }; -use lru::LruCache; use massa_logging::massa_trace; use massa_models::{ endorsement::SecureShareEndorsement, @@ -17,6 +16,7 @@ use massa_protocol_exports::{ProtocolConfig, ProtocolError}; use massa_serialization::{DeserializeError, Deserializer}; use massa_storage::Storage; use massa_time::MassaTime; +use schnellru::{ByLength, LruMap}; use tracing::{debug, info, warn}; use crate::{ @@ -140,7 +140,11 @@ impl RetrievalThread { // check endorsement signature if not already checked { let read_cache = self.cache.read(); - if !read_cache.checked_endorsements.contains(&endorsement_id) { + if read_cache + .checked_endorsements + .peek(&endorsement_id) + .is_none() + { new_endorsements.insert(endorsement_id, endorsement); } } @@ -160,24 +164,29 @@ impl RetrievalThread { }) .collect::>(), )?; - { + 'write_cache: { let mut cache_write = self.cache.write(); // add to verified signature cache for endorsement_id in endorsement_ids.iter() { - cache_write.checked_endorsements.put(*endorsement_id, ()); + cache_write.checked_endorsements.insert(*endorsement_id, ()); } // add to known endorsements for source node. - let endorsements = cache_write.endorsements_known_by_peer.get_or_insert_mut( - from_peer_id.clone(), - || { - LruCache::new( - NonZeroUsize::new(self.config.max_node_known_endorsements_size) + let Ok(endorsements) = cache_write + .endorsements_known_by_peer + .get_or_insert(from_peer_id.clone(), || { + LruMap::new(ByLength::new( + self.config + .max_node_known_endorsements_size + .try_into() .expect("max_node_known_endorsements_size in config should be > 0"), - ) - }, - ); + )) + }) + .ok_or(()) else { + warn!("endorsements_known_by_peer limit reached"); + break 'write_cache; + }; for endorsement_id in endorsement_ids.iter() { - endorsements.put(*endorsement_id, ()); + endorsements.insert(*endorsement_id, ()); } } diff --git a/massa-protocol-worker/src/handlers/operation_handler/cache.rs b/massa-protocol-worker/src/handlers/operation_handler/cache.rs index 7d2cd6cee0e..79638d50be4 100644 --- a/massa-protocol-worker/src/handlers/operation_handler/cache.rs +++ b/massa-protocol-worker/src/handlers/operation_handler/cache.rs @@ -1,29 +1,29 @@ -use std::{num::NonZeroUsize, sync::Arc}; +use std::sync::Arc; -use lru::LruCache; use massa_models::operation::{OperationId, OperationPrefixId}; use massa_protocol_exports::PeerId; use parking_lot::RwLock; +use schnellru::{ByLength, LruMap}; pub struct OperationCache { - pub checked_operations: LruCache, - pub checked_operations_prefix: LruCache, - pub ops_known_by_peer: LruCache>, + pub checked_operations: LruMap, + pub checked_operations_prefix: LruMap, + pub ops_known_by_peer: LruMap>, } impl OperationCache { - pub fn new(max_known_ops: NonZeroUsize, max_known_ops_by_peer: NonZeroUsize) -> Self { + pub fn new(max_known_ops: u32, max_known_ops_by_peer: u32) -> Self { Self { - checked_operations: LruCache::new(max_known_ops), - checked_operations_prefix: LruCache::new(max_known_ops), - ops_known_by_peer: LruCache::new(max_known_ops_by_peer), + checked_operations: LruMap::new(ByLength::new(max_known_ops)), + checked_operations_prefix: LruMap::new(ByLength::new(max_known_ops)), + ops_known_by_peer: LruMap::new(ByLength::new(max_known_ops_by_peer)), } } pub fn insert_checked_operation(&mut self, operation_id: OperationId) { - self.checked_operations.put(operation_id, ()); + self.checked_operations.insert(operation_id, ()); self.checked_operations_prefix - .put(operation_id.prefix(), ()); + .insert(operation_id.prefix(), ()); } } diff --git a/massa-protocol-worker/src/handlers/operation_handler/propagation.rs b/massa-protocol-worker/src/handlers/operation_handler/propagation.rs index 9ebfdf948ba..6ea3843a3e2 100644 --- a/massa-protocol-worker/src/handlers/operation_handler/propagation.rs +++ b/massa-protocol-worker/src/handlers/operation_handler/propagation.rs @@ -1,11 +1,11 @@ -use std::{mem, num::NonZeroUsize, thread::JoinHandle}; +use std::{mem, thread::JoinHandle}; use crossbeam::channel::{Receiver, RecvTimeoutError}; -use lru::LruCache; use massa_logging::massa_trace; use massa_models::operation::OperationId; use massa_protocol_exports::PeerId; use massa_protocol_exports::ProtocolConfig; +use schnellru::{ByLength, LruMap}; use tracing::{debug, info, log::warn}; use crate::{ @@ -96,33 +96,42 @@ impl PropagationThread { for peer_id in peers { if !peers_connected.contains(&peer_id) { - cache_write.ops_known_by_peer.pop(&peer_id); + cache_write.ops_known_by_peer.remove(&peer_id); } } // Add new potential peers for peer_id in peers_connected { - if !cache_write.ops_known_by_peer.contains(&peer_id) { - cache_write.ops_known_by_peer.put( + if cache_write.ops_known_by_peer.peek(&peer_id).is_none() { + cache_write.ops_known_by_peer.insert( peer_id.clone(), - LruCache::new( - NonZeroUsize::new(self.config.max_node_known_ops_size) + LruMap::new(ByLength::new( + self.config + .max_node_known_ops_size + .try_into() .expect("max_node_known_endorsements_size in config is > 0"), - ), + )), ); } } // Propagate to peers - for (peer_id, ops) in cache_write.ops_known_by_peer.iter_mut() { + let all_keys: Vec = cache_write + .ops_known_by_peer + .iter() + .map(|(k, _)| k) + .cloned() + .collect(); + for peer_id in all_keys { + let ops = cache_write.ops_known_by_peer.peek_mut(&peer_id).unwrap(); let new_ops: Vec = operation_ids .iter() - .filter(|id| !ops.contains(&id.prefix())) + .filter(|id| ops.peek(&id.prefix()).is_none()) .copied() .collect(); if !new_ops.is_empty() { for id in &new_ops { - ops.put(id.prefix(), ()); + ops.insert(id.prefix(), ()); } debug!( "Send operations announcement of len {} to {}", @@ -132,7 +141,7 @@ impl PropagationThread { for sub_list in new_ops.chunks(self.config.max_operations_per_message as usize) { if let Err(err) = self.active_connections.send_to_peer( - peer_id, + &peer_id, &self.operation_message_serializer, OperationMessage::OperationsAnnouncement( sub_list.iter().map(|id| id.into_prefix()).collect(), diff --git a/massa-protocol-worker/src/handlers/operation_handler/retrieval.rs b/massa-protocol-worker/src/handlers/operation_handler/retrieval.rs index 6fb5409a968..87f1b7cd409 100644 --- a/massa-protocol-worker/src/handlers/operation_handler/retrieval.rs +++ b/massa-protocol-worker/src/handlers/operation_handler/retrieval.rs @@ -1,6 +1,5 @@ use std::{ collections::{HashMap, VecDeque}, - num::NonZeroUsize, thread::JoinHandle, time::Instant, }; @@ -9,7 +8,6 @@ use crossbeam::{ channel::{tick, Receiver, Sender}, select, }; -use lru::LruCache; use massa_logging::massa_trace; use massa_models::{ operation::{OperationId, OperationPrefixId, OperationPrefixIds, SecureShareOperation}, @@ -24,6 +22,7 @@ use massa_protocol_exports::{ProtocolConfig, ProtocolError}; use massa_serialization::{DeserializeError, Deserializer}; use massa_storage::Storage; use massa_time::{MassaTime, TimeError}; +use schnellru::{ByLength, LruMap}; use crate::{ handlers::peer_handler::models::{PeerManagementCmd, PeerMessageTuple}, @@ -57,7 +56,7 @@ pub struct RetrievalThread { receiver: Receiver, pool_controller: Box, cache: SharedOperationCache, - asked_operations: LruCache)>, + asked_operations: LruMap)>, active_connections: Box, op_batch_buffer: VecDeque, stored_operations: HashMap>, @@ -194,7 +193,13 @@ impl RetrievalThread { received_ids.insert(operation_id); // Check operation signature only if not already checked. - if !self.cache.read().checked_operations.contains(&operation_id) { + if self + .cache + .read() + .checked_operations + .peek(&operation_id) + .is_none() + { // check signature if the operation wasn't in `checked_operation` new_operations.insert(operation_id, operation); }; @@ -208,7 +213,7 @@ impl RetrievalThread { .collect::>(), )?; - { + 'write_cache: { // add to checked operations let mut cache_write = self.cache.write(); for op_id in new_operations.keys().copied() { @@ -216,17 +221,22 @@ impl RetrievalThread { } // add to known ops - let known_ops = - cache_write - .ops_known_by_peer - .get_or_insert_mut(source_peer_id.clone(), || { - LruCache::new( - NonZeroUsize::new(self.config.max_node_known_ops_size) - .expect("max_node_known_ops_size in config must be > 0"), - ) - }); + let Ok(known_ops) = cache_write + .ops_known_by_peer + .get_or_insert(source_peer_id.clone(), || { + LruMap::new(ByLength::new( + self.config + .max_node_known_ops_size + .try_into() + .expect("max_node_known_ops_size in config must be > 0"), + )) + }) + .ok_or(()) else { + warn!("ops_known_by_peer limitation reached"); + break 'write_cache; + }; for id in received_ids { - known_ops.put(id.prefix(), ()); + known_ops.insert(id.prefix(), ()); } } @@ -308,26 +318,31 @@ impl RetrievalThread { peer_id: &PeerId, ) -> Result<(), ProtocolError> { // mark sender as knowing the ops - { + 'write_cache: { let mut cache_write = self.cache.write(); - let known_ops = - cache_write - .ops_known_by_peer - .get_or_insert_mut(peer_id.clone(), || { - LruCache::new( - NonZeroUsize::new(self.config.max_node_known_ops_size) - .expect("max_node_known_ops_size in config must be > 0"), - ) - }); + let Ok(known_ops) = cache_write + .ops_known_by_peer + .get_or_insert(peer_id.clone(), || { + LruMap::new(ByLength::new( + self.config + .max_node_known_ops_size + .try_into() + .expect("max_node_known_ops_size in config must be > 0"), + )) + }) + .ok_or(()) else { + warn!("ops_known_by_peer limitation reached"); + break 'write_cache; + }; for prefix in &op_batch { - known_ops.put(*prefix, ()); + known_ops.insert(*prefix, ()); } } // filter out the operations that we already know about { let cache_read = self.cache.read(); - op_batch.retain(|prefix| !cache_read.checked_operations_prefix.contains(prefix)); + op_batch.retain(|prefix| cache_read.checked_operations_prefix.peek(prefix).is_none()); } let mut ask_set = OperationPrefixIds::with_capacity(op_batch.len()); @@ -336,7 +351,7 @@ impl RetrievalThread { let now = Instant::now(); let mut count_reask = 0; for op_id in op_batch { - let wish = match self.asked_operations.get_mut(&op_id) { + let wish = match self.asked_operations.get(&op_id) { Some(wish) => { if wish.1.contains(peer_id) { continue; // already asked to the `peer_id` @@ -364,7 +379,7 @@ impl RetrievalThread { } else { ask_set.insert(op_id); self.asked_operations - .put(op_id, (now, vec![peer_id.clone()])); + .insert(op_id, (now, vec![peer_id.clone()])); } } // EndOf for op_id in op_batch: @@ -405,7 +420,7 @@ impl RetrievalThread { warn!("Failed to send AskForOperations message to peer: {}", err); { let mut cache_write = self.cache.write(); - cache_write.ops_known_by_peer.pop(peer_id); + cache_write.ops_known_by_peer.remove(peer_id); } } } @@ -469,7 +484,7 @@ impl RetrievalThread { warn!("Failed to send Operations message to peer: {}", err); { let mut cache_write = self.cache.write(); - cache_write.ops_known_by_peer.pop(peer_id); + cache_write.ops_known_by_peer.remove(peer_id); } } } @@ -509,10 +524,12 @@ pub fn start_retrieval_thread( receiver_ext, cache, active_connections, - asked_operations: LruCache::new( - NonZeroUsize::new(config.asked_operations_buffer_capacity) + asked_operations: LruMap::new(ByLength::new( + config + .asked_operations_buffer_capacity + .try_into() .expect("asked_operations_buffer_capacity in config must be > 0"), - ), + )), config, operation_message_serializer: MessagesSerializer::new() .with_operation_message_serializer(OperationMessageSerializer::new()),