Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Use schnellru crate instead of lru in massa-protocol-worker #3974

Merged
merged 4 commits into from
May 25, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 1 addition & 10 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion massa-protocol-worker/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ num_enum = "0.5"
peernet = { git = "https://github.com/massalabs/PeerNet", branch = "generic_peer_id" }
tempfile = { version = "3.3", optional = true } # use with testing feature
rayon = "1.7.0"
lru = "0.10.0"
schnellru = "0.2.1"

# modules Custom
massa_hash = { path = "../massa-hash" }
Expand Down
14 changes: 7 additions & 7 deletions massa-protocol-worker/src/connectivity.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,8 +12,8 @@ use parking_lot::RwLock;
use peernet::transports::TcpOutConnectionConfig;
use peernet::{peer::PeerConnectionType, transports::OutConnectionConfig};
use std::net::SocketAddr;
use std::sync::Arc;
use std::{collections::HashMap, net::IpAddr};
use std::{num::NonZeroUsize, sync::Arc};
use std::{thread::JoinHandle, time::Duration};
use tracing::{info, warn};

Expand Down Expand Up @@ -87,17 +87,17 @@ pub(crate) fn start_connectivity_thread(
let total_in_slots = config.peers_categories.values().map(|v| v.max_in_connections_post_handshake).sum::<usize>() + config.default_category_info.max_in_connections_post_handshake;
let total_out_slots = config.peers_categories.values().map(| v| v.target_out_connections).sum::<usize>() + config.default_category_info.target_out_connections;
let operation_cache = Arc::new(RwLock::new(OperationCache::new(
NonZeroUsize::new(config.max_known_ops_size).unwrap(),
NonZeroUsize::new(total_in_slots + total_out_slots).unwrap(),
config.max_known_blocks_size.try_into().unwrap(),
(total_in_slots + total_out_slots).try_into().unwrap(),
)));
let endorsement_cache = Arc::new(RwLock::new(EndorsementCache::new(
NonZeroUsize::new(config.max_known_endorsements_size).unwrap(),
NonZeroUsize::new(total_in_slots + total_out_slots).unwrap(),
config.max_known_endorsements_size.try_into().unwrap(),
(total_in_slots + total_out_slots).try_into().unwrap()
)));

let block_cache = Arc::new(RwLock::new(BlockCache::new(
NonZeroUsize::new(config.max_known_blocks_size).unwrap(),
NonZeroUsize::new(total_in_slots + total_out_slots).unwrap(),
config.max_known_blocks_size.try_into().unwrap(),
(total_in_slots + total_out_slots).try_into().unwrap(),
)));

// Start handlers
Expand Down
34 changes: 21 additions & 13 deletions massa-protocol-worker/src/handlers/block_handler/cache.rs
Original file line number Diff line number Diff line change
@@ -1,15 +1,16 @@
use std::{num::NonZeroUsize, sync::Arc, time::Instant};
use std::{sync::Arc, time::Instant};

use lru::LruCache;
use massa_models::{block_header::SecuredHeader, block_id::BlockId};
use massa_protocol_exports::PeerId;
use parking_lot::RwLock;
use schnellru::{ByLength, LruMap};
use tracing::log::warn;

pub struct BlockCache {
pub checked_headers: LruCache<BlockId, SecuredHeader>,
pub checked_headers: LruMap<BlockId, SecuredHeader>,
#[allow(clippy::type_complexity)]
pub blocks_known_by_peer: LruCache<PeerId, (LruCache<BlockId, (bool, Instant)>, Instant)>,
pub max_known_blocks_by_peer: NonZeroUsize,
pub blocks_known_by_peer: LruMap<PeerId, (LruMap<BlockId, (bool, Instant)>, Instant)>,
pub max_known_blocks_by_peer: u32,
}

impl BlockCache {
Expand All @@ -20,22 +21,29 @@ impl BlockCache {
val: bool,
timeout: Instant,
) {
let (blocks, _) = self
let Ok((blocks, _)) = self
.blocks_known_by_peer
.get_or_insert_mut(from_peer_id.clone(), || {
(LruCache::new(self.max_known_blocks_by_peer), Instant::now())
});
.get_or_insert(from_peer_id.clone(), || {
(
LruMap::new(ByLength::new(self.max_known_blocks_by_peer)),
Instant::now(),
)
})
.ok_or(()) else {
warn!("blocks_known_by_peer limit reached");
return;
};
for block_id in block_ids {
blocks.put(*block_id, (val, timeout));
blocks.insert(*block_id, (val, timeout));
}
}
}

impl BlockCache {
pub fn new(max_known_blocks: NonZeroUsize, max_known_blocks_by_peer: NonZeroUsize) -> Self {
pub fn new(max_known_blocks: u32, max_known_blocks_by_peer: u32) -> Self {
Self {
checked_headers: LruCache::new(max_known_blocks),
blocks_known_by_peer: LruCache::new(max_known_blocks_by_peer),
checked_headers: LruMap::new(ByLength::new(max_known_blocks)),
blocks_known_by_peer: LruMap::new(ByLength::new(max_known_blocks_by_peer)),
max_known_blocks_by_peer,
}
}
Expand Down
19 changes: 10 additions & 9 deletions massa-protocol-worker/src/handlers/block_handler/propagation.rs
Original file line number Diff line number Diff line change
@@ -1,12 +1,12 @@
use std::{collections::VecDeque, num::NonZeroUsize, thread::JoinHandle, time::Instant};
use std::{collections::VecDeque, thread::JoinHandle, time::Instant};

use crossbeam::channel::{Receiver, Sender};
use lru::LruCache;
use massa_logging::massa_trace;
use massa_models::{block_id::BlockId, prehash::PreHashSet};
use massa_protocol_exports::PeerId;
use massa_protocol_exports::{ProtocolConfig, ProtocolError};
use massa_storage::Storage;
use schnellru::{ByLength, LruMap};
use tracing::{debug, info, warn};

use crate::{
Expand Down Expand Up @@ -77,18 +77,18 @@ impl PropagationThread {
self.active_connections.get_peer_ids_connected();
for peer_id in peers {
if !peers_connected.contains(&peer_id) {
cache_write.blocks_known_by_peer.pop(&peer_id);
cache_write.blocks_known_by_peer.remove(&peer_id);
}
}
for peer_id in peers_connected {
if !cache_write.blocks_known_by_peer.contains(&peer_id) {
if cache_write.blocks_known_by_peer.peek(&peer_id).is_none() {
//TODO: Change to detect the connection before
cache_write.blocks_known_by_peer.put(
cache_write.blocks_known_by_peer.insert(
peer_id,
(
LruCache::new(
NonZeroUsize::new(self.config.max_node_known_blocks_size)
.expect("max_node_known_blocks_size in config must be > 0"),
LruMap::new(
ByLength::new(self.config.max_node_known_blocks_size.try_into()
.expect("max_node_known_blocks_size in config must be > 0")),
),
Instant::now(),
),
Expand All @@ -98,7 +98,8 @@ impl PropagationThread {
}
{
let cache_read = self.cache.read();
for (peer_id, (blocks_known, _)) in &cache_read.blocks_known_by_peer
for (peer_id, (blocks_known, _)) in
cache_read.blocks_known_by_peer.iter()
{
// peer that isn't asking for that block
let cond = blocks_known.peek(&block_id);
Expand Down
Loading