Skip to content
This repository has been archived by the owner on Nov 15, 2023. It is now read-only.

Commit

Permalink
replace lru with schnellru (#14539)
Browse files Browse the repository at this point in the history
  • Loading branch information
koushiro authored Jul 9, 2023
1 parent 3fee5c7 commit 2cc2646
Show file tree
Hide file tree
Showing 11 changed files with 58 additions and 71 deletions.
8 changes: 4 additions & 4 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion client/executor/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -14,8 +14,8 @@ readme = "README.md"
targets = ["x86_64-unknown-linux-gnu"]

[dependencies]
lru = "0.10.0"
parking_lot = "0.12.1"
schnellru = "0.2.1"
tracing = "0.1.29"

codec = { package = "parity-scale-codec", version = "3.6.1" }
Expand Down
17 changes: 8 additions & 9 deletions client/executor/src/wasm_runtime.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,24 +22,24 @@
//! components of the runtime that are expensive to initialize.

use crate::error::{Error, WasmError};

use codec::Decode;
use lru::LruCache;
use parking_lot::Mutex;
use sc_executor_common::{
runtime_blob::RuntimeBlob,
wasm_runtime::{HeapAllocStrategy, WasmInstance, WasmModule},
};
use schnellru::{ByLength, LruMap};
use sp_core::traits::{Externalities, FetchRuntimeCode, RuntimeCode};
use sp_version::RuntimeVersion;
use sp_wasm_interface::HostFunctions;

use std::{
num::NonZeroUsize,
panic::AssertUnwindSafe,
path::{Path, PathBuf},
sync::Arc,
};

use sp_wasm_interface::HostFunctions;

/// Specification of different methods of executing the runtime Wasm code.
#[derive(Debug, PartialEq, Eq, Hash, Copy, Clone)]
pub enum WasmExecutionMethod {
Expand Down Expand Up @@ -163,7 +163,7 @@ pub struct RuntimeCache {
/// A cache of runtimes along with metadata.
///
/// Runtimes sorted by recent usage. The most recently used is at the front.
runtimes: Mutex<LruCache<VersionedRuntimeId, Arc<VersionedRuntime>>>,
runtimes: Mutex<LruMap<VersionedRuntimeId, Arc<VersionedRuntime>>>,
/// The size of the instances cache for each runtime.
max_runtime_instances: usize,
cache_path: Option<PathBuf>,
Expand All @@ -185,9 +185,8 @@ impl RuntimeCache {
cache_path: Option<PathBuf>,
runtime_cache_size: u8,
) -> RuntimeCache {
let cap =
NonZeroUsize::new(runtime_cache_size.max(1) as usize).expect("cache size is not zero");
RuntimeCache { runtimes: Mutex::new(LruCache::new(cap)), max_runtime_instances, cache_path }
let cap = ByLength::new(runtime_cache_size.max(1) as u32);
RuntimeCache { runtimes: Mutex::new(LruMap::new(cap)), max_runtime_instances, cache_path }
}

/// Prepares a WASM module instance and executes given function for it.
Expand Down Expand Up @@ -275,7 +274,7 @@ impl RuntimeCache {
let versioned_runtime = Arc::new(result?);

// Save new versioned wasm runtime in cache
runtimes.put(versioned_runtime_id, versioned_runtime.clone());
runtimes.insert(versioned_runtime_id, versioned_runtime.clone());

versioned_runtime
};
Expand Down
2 changes: 1 addition & 1 deletion client/network-gossip/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ futures = "0.3.21"
futures-timer = "3.0.1"
libp2p = "0.51.3"
log = "0.4.17"
lru = "0.10.0"
schnellru = "0.2.1"
tracing = "0.1.29"
prometheus-endpoint = { package = "substrate-prometheus-endpoint", version = "0.10.0-dev", path = "../../utils/prometheus" }
sc-network = { version = "0.10.0-dev", path = "../network/" }
Expand Down
29 changes: 13 additions & 16 deletions client/network-gossip/src/state_machine.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,12 +20,13 @@ use crate::{MessageIntent, Network, ValidationResult, Validator, ValidatorContex

use ahash::AHashSet;
use libp2p::PeerId;
use lru::LruCache;
use schnellru::{ByLength, LruMap};

use prometheus_endpoint::{register, Counter, PrometheusError, Registry, U64};
use sc_network::types::ProtocolName;
use sc_network_common::role::ObservedRole;
use sp_runtime::traits::{Block as BlockT, Hash, HashFor};
use std::{collections::HashMap, iter, num::NonZeroUsize, sync::Arc, time, time::Instant};
use std::{collections::HashMap, iter, sync::Arc, time, time::Instant};

// FIXME: Add additional spam/DoS attack protection: https://github.com/paritytech/substrate/issues/1115
// NOTE: The current value is adjusted based on largest production network deployment (Kusama) and
Expand All @@ -36,7 +37,7 @@ use std::{collections::HashMap, iter, num::NonZeroUsize, sync::Arc, time, time::
//
// Assuming that each known message is tracked with a 32 byte hash (common for `Block::Hash`), then
// this cache should take about 256 KB of memory.
const KNOWN_MESSAGES_CACHE_SIZE: usize = 8192;
const KNOWN_MESSAGES_CACHE_SIZE: u32 = 8192;

const REBROADCAST_INTERVAL: time::Duration = time::Duration::from_millis(750);

Expand Down Expand Up @@ -155,7 +156,7 @@ where
pub struct ConsensusGossip<B: BlockT> {
peers: HashMap<PeerId, PeerConsensus<B::Hash>>,
messages: Vec<MessageEntry<B>>,
known_messages: LruCache<B::Hash, ()>,
known_messages: LruMap<B::Hash, ()>,
protocol: ProtocolName,
validator: Arc<dyn Validator<B>>,
next_broadcast: Instant,
Expand All @@ -181,11 +182,7 @@ impl<B: BlockT> ConsensusGossip<B> {
ConsensusGossip {
peers: HashMap::new(),
messages: Default::default(),
known_messages: {
let cap = NonZeroUsize::new(KNOWN_MESSAGES_CACHE_SIZE)
.expect("cache capacity is not zero");
LruCache::new(cap)
},
known_messages: { LruMap::new(ByLength::new(KNOWN_MESSAGES_CACHE_SIZE)) },
protocol,
validator,
next_broadcast: Instant::now() + REBROADCAST_INTERVAL,
Expand Down Expand Up @@ -216,7 +213,7 @@ impl<B: BlockT> ConsensusGossip<B> {
message: Vec<u8>,
sender: Option<PeerId>,
) {
if self.known_messages.put(message_hash, ()).is_none() {
if self.known_messages.insert(message_hash, ()) {
self.messages.push(MessageEntry { message_hash, topic, message, sender });

if let Some(ref metrics) = self.metrics {
Expand Down Expand Up @@ -313,7 +310,7 @@ impl<B: BlockT> ConsensusGossip<B> {
);

for (_, ref mut peer) in self.peers.iter_mut() {
peer.known_messages.retain(|h| known_messages.contains(h));
peer.known_messages.retain(|h| known_messages.get(h).is_some());
}
}

Expand Down Expand Up @@ -348,7 +345,7 @@ impl<B: BlockT> ConsensusGossip<B> {
for message in messages {
let message_hash = HashFor::<B>::hash(&message[..]);

if self.known_messages.contains(&message_hash) {
if self.known_messages.get(&message_hash).is_some() {
tracing::trace!(
target: "gossip",
%who,
Expand Down Expand Up @@ -545,7 +542,7 @@ mod tests {

macro_rules! push_msg {
($consensus:expr, $topic:expr, $hash: expr, $m:expr) => {
if $consensus.known_messages.put($hash, ()).is_none() {
if $consensus.known_messages.insert($hash, ()) {
$consensus.messages.push(MessageEntry {
message_hash: $hash,
topic: $topic,
Expand Down Expand Up @@ -720,8 +717,8 @@ mod tests {

push_msg!(consensus, prev_hash, m1_hash, m1);
push_msg!(consensus, best_hash, m2_hash, m2);
consensus.known_messages.put(m1_hash, ());
consensus.known_messages.put(m2_hash, ());
consensus.known_messages.insert(m1_hash, ());
consensus.known_messages.insert(m2_hash, ());

consensus.collect_garbage();
assert_eq!(consensus.messages.len(), 2);
Expand All @@ -734,7 +731,7 @@ mod tests {
assert_eq!(consensus.messages.len(), 1);
// known messages are only pruned based on size.
assert_eq!(consensus.known_messages.len(), 2);
assert!(consensus.known_messages.contains(&m2_hash));
assert!(consensus.known_messages.get(&m2_hash).is_some());
}

#[test]
Expand Down
2 changes: 1 addition & 1 deletion client/network/sync/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -24,9 +24,9 @@ futures = "0.3.21"
futures-timer = "3.0.2"
libp2p = "0.51.3"
log = "0.4.17"
lru = "0.10.0"
mockall = "0.11.3"
prost = "0.11"
schnellru = "0.2.1"
smallvec = "1.8.0"
thiserror = "1.0"
fork-tree = { version = "3.0.0", path = "../../../utils/fork-tree" }
Expand Down
16 changes: 7 additions & 9 deletions client/network/sync/src/block_request_handler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,8 +26,8 @@ use codec::{Decode, Encode};
use futures::{channel::oneshot, stream::StreamExt};
use libp2p::PeerId;
use log::debug;
use lru::LruCache;
use prost::Message;
use schnellru::{ByLength, LruMap};

use sc_client_api::BlockBackend;
use sc_network::{
Expand All @@ -44,7 +44,6 @@ use sp_runtime::{
use std::{
cmp::min,
hash::{Hash, Hasher},
num::NonZeroUsize,
sync::Arc,
time::Duration,
};
Expand Down Expand Up @@ -137,7 +136,7 @@ pub struct BlockRequestHandler<B: BlockT, Client> {
/// Maps from request to number of times we have seen this request.
///
/// This is used to check if a peer is spamming us with the same request.
seen_requests: LruCache<SeenRequestsKey<B>, SeenRequestsValue>,
seen_requests: LruMap<SeenRequestsKey<B>, SeenRequestsValue>,
}

impl<B, Client> BlockRequestHandler<B, Client>
Expand Down Expand Up @@ -167,9 +166,8 @@ where
);
protocol_config.inbound_queue = Some(tx);

let capacity =
NonZeroUsize::new(num_peer_hint.max(1) * 2).expect("cache capacity is not zero");
let seen_requests = LruCache::new(capacity);
let capacity = ByLength::new(num_peer_hint.max(1) as u32 * 2);
let seen_requests = LruMap::new(capacity);

(Self { client, request_receiver, seen_requests }, protocol_config)
}
Expand Down Expand Up @@ -236,7 +234,7 @@ where
.difference(BlockAttributes::HEADER | BlockAttributes::JUSTIFICATION)
.is_empty();

match self.seen_requests.get_mut(&key) {
match self.seen_requests.get(&key) {
Some(SeenRequestsValue::First) => {},
Some(SeenRequestsValue::Fulfilled(ref mut requests)) => {
*requests = requests.saturating_add(1);
Expand All @@ -250,7 +248,7 @@ where
}
},
None => {
self.seen_requests.put(key.clone(), SeenRequestsValue::First);
self.seen_requests.insert(key.clone(), SeenRequestsValue::First);
},
}

Expand All @@ -277,7 +275,7 @@ where
.iter()
.any(|b| !b.header.is_empty() || !b.body.is_empty() || b.is_empty_justification)
{
if let Some(value) = self.seen_requests.get_mut(&key) {
if let Some(value) = self.seen_requests.get(&key) {
// If this is the first time we have processed this request, we need to change
// it to `Fulfilled`.
if let SeenRequestsValue::First = value {
Expand Down
17 changes: 7 additions & 10 deletions client/network/sync/src/engine.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,10 +28,10 @@ use codec::{Decode, Encode};
use futures::{FutureExt, StreamExt};
use futures_timer::Delay;
use libp2p::PeerId;
use lru::LruCache;
use prometheus_endpoint::{
register, Gauge, GaugeVec, MetricSource, Opts, PrometheusError, Registry, SourcedGauge, U64,
};
use schnellru::{ByLength, LruMap};

use sc_client_api::{BlockBackend, HeaderBackend, ProofProvider};
use sc_consensus::import_queue::ImportQueueService;
Expand Down Expand Up @@ -239,7 +239,7 @@ pub struct SyncingEngine<B: BlockT, Client> {
default_peers_set_num_light: usize,

/// A cache for the data that was associated to a block announcement.
block_announce_data_cache: LruCache<B::Hash, Vec<u8>>,
block_announce_data_cache: LruMap<B::Hash, Vec<u8>>,

/// The `PeerId`'s of all boot nodes.
boot_node_ids: HashSet<PeerId>,
Expand Down Expand Up @@ -294,12 +294,9 @@ where
} else {
net_config.network_config.max_blocks_per_request
};
let cache_capacity = NonZeroUsize::new(
(net_config.network_config.default_peers_set.in_peers as usize +
net_config.network_config.default_peers_set.out_peers as usize)
.max(1),
)
.expect("cache capacity is not zero");
let cache_capacity = (net_config.network_config.default_peers_set.in_peers +
net_config.network_config.default_peers_set.out_peers)
.max(1);
let important_peers = {
let mut imp_p = HashSet::new();
for reserved in &net_config.network_config.default_peers_set.reserved_nodes {
Expand Down Expand Up @@ -381,7 +378,7 @@ where
network_service,
peers: HashMap::new(),
evicted: HashSet::new(),
block_announce_data_cache: LruCache::new(cache_capacity),
block_announce_data_cache: LruMap::new(ByLength::new(cache_capacity)),
block_announce_protocol_name,
num_connected: num_connected.clone(),
is_major_syncing: is_major_syncing.clone(),
Expand Down Expand Up @@ -465,7 +462,7 @@ where

if let Some(data) = announce.data {
if !data.is_empty() {
self.block_announce_data_cache.put(announce.header.hash(), data);
self.block_announce_data_cache.insert(announce.header.hash(), data);
}
}
},
Expand Down
Loading

0 comments on commit 2cc2646

Please sign in to comment.