Skip to content

Commit

Permalink
Use schnellru instead of lru for Lru caching system
Browse files Browse the repository at this point in the history
Signed-off-by: Litchi Pi <litchi.pi@proton.me>
  • Loading branch information
litchipi committed May 24, 2023
1 parent 0e51272 commit b63c8b7
Show file tree
Hide file tree
Showing 12 changed files with 260 additions and 187 deletions.
11 changes: 1 addition & 10 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion massa-protocol-worker/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ num_enum = "0.5"
peernet = { git = "https://github.com/massalabs/PeerNet", branch = "generic_peer_id" }
tempfile = { version = "3.3", optional = true } # use with testing feature
rayon = "1.7.0"
lru = "0.10.0"
schnellru = "0.2.1"

# modules Custom
massa_hash = { path = "../massa-hash" }
Expand Down
14 changes: 7 additions & 7 deletions massa-protocol-worker/src/connectivity.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,8 +12,8 @@ use parking_lot::RwLock;
use peernet::transports::TcpOutConnectionConfig;
use peernet::{peer::PeerConnectionType, transports::OutConnectionConfig};
use std::net::SocketAddr;
use std::sync::Arc;
use std::{collections::HashMap, net::IpAddr};
use std::{num::NonZeroUsize, sync::Arc};
use std::{thread::JoinHandle, time::Duration};
use tracing::{info, warn};

Expand Down Expand Up @@ -87,17 +87,17 @@ pub(crate) fn start_connectivity_thread(
let total_in_slots = config.peers_categories.values().map(|v| v.max_in_connections_post_handshake).sum::<usize>() + config.default_category_info.max_in_connections_post_handshake;
let total_out_slots = config.peers_categories.values().map(| v| v.target_out_connections).sum::<usize>() + config.default_category_info.target_out_connections;
let operation_cache = Arc::new(RwLock::new(OperationCache::new(
NonZeroUsize::new(config.max_known_ops_size).unwrap(),
NonZeroUsize::new(total_in_slots + total_out_slots).unwrap(),
config.max_known_blocks_size.try_into().unwrap(),
(total_in_slots + total_out_slots).try_into().unwrap(),
)));
let endorsement_cache = Arc::new(RwLock::new(EndorsementCache::new(
NonZeroUsize::new(config.max_known_endorsements_size).unwrap(),
NonZeroUsize::new(total_in_slots + total_out_slots).unwrap(),
config.max_known_endorsements_size.try_into().unwrap(),
(total_in_slots + total_out_slots).try_into().unwrap()
)));

let block_cache = Arc::new(RwLock::new(BlockCache::new(
NonZeroUsize::new(config.max_known_blocks_size).unwrap(),
NonZeroUsize::new(total_in_slots + total_out_slots).unwrap(),
config.max_known_blocks_size.try_into().unwrap(),
(total_in_slots + total_out_slots).try_into().unwrap(),
)));

// Start handlers
Expand Down
29 changes: 17 additions & 12 deletions massa-protocol-worker/src/handlers/block_handler/cache.rs
Original file line number Diff line number Diff line change
@@ -1,15 +1,15 @@
use std::{num::NonZeroUsize, sync::Arc, time::Instant};
use std::{sync::Arc, time::Instant};

use lru::LruCache;
use massa_models::{block_header::SecuredHeader, block_id::BlockId};
use massa_protocol_exports::PeerId;
use parking_lot::RwLock;
use schnellru::{ByLength, LruMap};

pub struct BlockCache {
pub checked_headers: LruCache<BlockId, SecuredHeader>,
pub checked_headers: LruMap<BlockId, SecuredHeader>,
#[allow(clippy::type_complexity)]
pub blocks_known_by_peer: LruCache<PeerId, (LruCache<BlockId, (bool, Instant)>, Instant)>,
pub max_known_blocks_by_peer: NonZeroUsize,
pub blocks_known_by_peer: LruMap<PeerId, (LruMap<BlockId, (bool, Instant)>, Instant)>,
pub max_known_blocks_by_peer: u32,
}

impl BlockCache {
Expand All @@ -22,20 +22,25 @@ impl BlockCache {
) {
let (blocks, _) = self
.blocks_known_by_peer
.get_or_insert_mut(from_peer_id.clone(), || {
(LruCache::new(self.max_known_blocks_by_peer), Instant::now())
});
.get_or_insert(from_peer_id.clone(), || {
(
LruMap::new(ByLength::new(self.max_known_blocks_by_peer)),
Instant::now(),
)
})
.ok_or(())
.expect("blocks_known_by_peer limit reached");
for block_id in block_ids {
blocks.put(*block_id, (val, timeout));
blocks.insert(*block_id, (val, timeout));
}
}
}

impl BlockCache {
pub fn new(max_known_blocks: NonZeroUsize, max_known_blocks_by_peer: NonZeroUsize) -> Self {
pub fn new(max_known_blocks: u32, max_known_blocks_by_peer: u32) -> Self {
Self {
checked_headers: LruCache::new(max_known_blocks),
blocks_known_by_peer: LruCache::new(max_known_blocks_by_peer),
checked_headers: LruMap::new(ByLength::new(max_known_blocks)),
blocks_known_by_peer: LruMap::new(ByLength::new(max_known_blocks_by_peer)),
max_known_blocks_by_peer,
}
}
Expand Down
19 changes: 10 additions & 9 deletions massa-protocol-worker/src/handlers/block_handler/propagation.rs
Original file line number Diff line number Diff line change
@@ -1,12 +1,12 @@
use std::{collections::VecDeque, num::NonZeroUsize, thread::JoinHandle, time::Instant};
use std::{collections::VecDeque, thread::JoinHandle, time::Instant};

use crossbeam::channel::{Receiver, Sender};
use lru::LruCache;
use massa_logging::massa_trace;
use massa_models::{block_id::BlockId, prehash::PreHashSet};
use massa_protocol_exports::PeerId;
use massa_protocol_exports::{ProtocolConfig, ProtocolError};
use massa_storage::Storage;
use schnellru::{ByLength, LruMap};
use tracing::{debug, info, warn};

use crate::{
Expand Down Expand Up @@ -77,18 +77,18 @@ impl PropagationThread {
self.active_connections.get_peer_ids_connected();
for peer_id in peers {
if !peers_connected.contains(&peer_id) {
cache_write.blocks_known_by_peer.pop(&peer_id);
cache_write.blocks_known_by_peer.remove(&peer_id);
}
}
for peer_id in peers_connected {
if !cache_write.blocks_known_by_peer.contains(&peer_id) {
if cache_write.blocks_known_by_peer.get(&peer_id).is_none() {
//TODO: Change to detect the connection before
cache_write.blocks_known_by_peer.put(
cache_write.blocks_known_by_peer.insert(
peer_id,
(
LruCache::new(
NonZeroUsize::new(self.config.max_node_known_blocks_size)
.expect("max_node_known_blocks_size in config must be > 0"),
LruMap::new(
ByLength::new(self.config.max_node_known_blocks_size.try_into()
.expect("max_node_known_blocks_size in config must be > 0")),
),
Instant::now(),
),
Expand All @@ -98,7 +98,8 @@ impl PropagationThread {
}
{
let cache_read = self.cache.read();
for (peer_id, (blocks_known, _)) in &cache_read.blocks_known_by_peer
for (peer_id, (blocks_known, _)) in
cache_read.blocks_known_by_peer.iter()
{
// peer that isn't asking for that block
let cond = blocks_known.peek(&block_id);
Expand Down
Loading

0 comments on commit b63c8b7

Please sign in to comment.