Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

refactor: use partition lock for memory cache #936

Merged
merged 13 commits into from
May 29, 2023
9 changes: 7 additions & 2 deletions common_types/src/hash.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,8 @@

// custom hash mod

use std::hash::BuildHasher;

/* We compared the speed difference between murmur3 and ahash for a string of
length 10, and the results show that ahash has a clear advantage.
Average time to DefaultHash a string of length 10: 33.6364 nanoseconds
Expand All @@ -11,17 +13,20 @@
One of the reasons is as follows:
https://github.com/tkaitchuck/aHash/blob/master/README.md#goals-and-non-goals
*/
pub use ahash::AHasher;
use ahash::AHasher;
use byteorder::{ByteOrder, LittleEndian};
use murmur3::murmur3_x64_128;

pub fn hash64(mut bytes: &[u8]) -> u64 {
let mut out = [0; 16];
murmur3_x64_128(&mut bytes, 0, &mut out);
// in most cases we run on little endian target
LittleEndian::read_u64(&out[0..8])
}

pub fn build_fixed_seed_ahasher() -> AHasher {
ahash::RandomState::with_seeds(0, 0, 0, 0).build_hasher()
}

#[cfg(test)]
mod test {
use super::*;
Expand Down
57 changes: 33 additions & 24 deletions common_util/src/partitioned_lock.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,22 +7,22 @@ use std::{
sync::{Mutex, MutexGuard, RwLock, RwLockReadGuard, RwLockWriteGuard},
};

use common_types::hash::AHasher;
use common_types::hash::build_fixed_seed_ahasher;
/// Simple partitioned `RwLock`
pub struct PartitionedRwLock<T> {
partitions: Vec<RwLock<T>>,
partition_mask: usize,
}

impl<T> PartitionedRwLock<T>
where
T: Clone,
{
pub fn new(t: T, partition_bit: usize) -> Self {
impl<T> PartitionedRwLock<T> {
pub fn new<F>(init_fn: F, partition_bit: usize) -> Self
where
F: Fn() -> T,
{
let partition_num = 1 << partition_bit;
let partitions = (0..partition_num)
.map(|_| RwLock::new(t.clone()))
.collect::<Vec<_>>();
let partitions = (1..partition_num)
.map(|_| RwLock::new(init_fn()))
.collect::<Vec<RwLock<T>>>();
Self {
partitions,
partition_mask: partition_num - 1,
Expand All @@ -42,7 +42,8 @@ where
}

fn get_partition<K: Eq + Hash>(&self, key: &K) -> &RwLock<T> {
let mut hasher = AHasher::default();
let mut hasher = build_fixed_seed_ahasher();

key.hash(&mut hasher);

&self.partitions[(hasher.finish() as usize) & self.partition_mask]
Expand All @@ -55,20 +56,21 @@ where
}

/// Simple partitioned `Mutex`
#[derive(Debug)]
pub struct PartitionedMutex<T> {
partitions: Vec<Mutex<T>>,
partition_mask: usize,
}

impl<T> PartitionedMutex<T>
where
T: Clone,
{
pub fn new(t: T, partition_bit: usize) -> Self {
impl<T> PartitionedMutex<T> {
pub fn new<F>(init_fn: F, partition_bit: usize) -> Self
where
F: Fn() -> T,
{
let partition_num = 1 << partition_bit;
let partitions = (0..partition_num)
.map(|_| Mutex::new(t.clone()))
.collect::<Vec<_>>();
.map(|_| Mutex::new(init_fn()))
.collect::<Vec<Mutex<T>>>();
Self {
partitions,
partition_mask: partition_num - 1,
Expand All @@ -82,7 +84,7 @@ where
}

fn get_partition<K: Eq + Hash>(&self, key: &K) -> &Mutex<T> {
let mut hasher = AHasher::default();
let mut hasher = build_fixed_seed_ahasher();
key.hash(&mut hasher);
&self.partitions[(hasher.finish() as usize) & self.partition_mask]
}
Expand All @@ -91,6 +93,11 @@ where
fn get_partition_by_index(&self, index: usize) -> &Mutex<T> {
&self.partitions[index]
}

/// This function should be marked with `#[cfg(test)]`, but there is [an issue](https://github.com/rust-lang/cargo/issues/8379) in cargo, so public this function now.
pub fn get_all_partition(&self) -> &Vec<Mutex<T>> {
tanruixiang marked this conversation as resolved.
Show resolved Hide resolved
&self.partitions
}
}

#[cfg(test)]
Expand All @@ -101,7 +108,8 @@ mod tests {

#[test]
fn test_partitioned_rwlock() {
let test_locked_map = PartitionedRwLock::new(HashMap::new(), 4);
let init_hmap = HashMap::new;
let test_locked_map = PartitionedRwLock::new(init_hmap, 4);
let test_key = "test_key".to_string();
let test_value = "test_value".to_string();

Expand All @@ -118,7 +126,8 @@ mod tests {

#[test]
fn test_partitioned_mutex() {
let test_locked_map = PartitionedMutex::new(HashMap::new(), 4);
let init_hmap = HashMap::new;
let test_locked_map = PartitionedMutex::new(init_hmap, 4);
let test_key = "test_key".to_string();
let test_value = "test_value".to_string();

Expand All @@ -135,8 +144,8 @@ mod tests {

#[test]
fn test_partitioned_mutex_vis_different_partition() {
let tmp_vec: Vec<f32> = Vec::new();
let test_locked_map = PartitionedMutex::new(tmp_vec, 4);
let init_vec = Vec::<i32>::new;
let test_locked_map = PartitionedMutex::new(init_vec, 4);
let mutex_first = test_locked_map.get_partition_by_index(0);

let mut _tmp_data = mutex_first.lock().unwrap();
Expand All @@ -149,8 +158,8 @@ mod tests {

#[test]
fn test_partitioned_rwmutex_vis_different_partition() {
let tmp_vec: Vec<f32> = Vec::new();
let test_locked_map = PartitionedRwLock::new(tmp_vec, 4);
let init_vec = Vec::<i32>::new;
let test_locked_map = PartitionedRwLock::new(init_vec, 4);
let mutex_first = test_locked_map.get_partition_by_index(0);
let mut _tmp = mutex_first.write().unwrap();
assert!(mutex_first.try_write().is_err());
Expand Down
103 changes: 26 additions & 77 deletions components/object_store/src/mem_cache.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,17 +5,17 @@
//! 2. Builtin Partition to reduce lock contention

use std::{
collections::hash_map::{DefaultHasher, RandomState},
collections::hash_map::RandomState,
fmt::{self, Display},
hash::{Hash, Hasher},
num::NonZeroUsize,
ops::Range,
sync::{Arc, Mutex},
sync::Arc,
};

use async_trait::async_trait;
use bytes::Bytes;
use clru::{CLruCache, CLruCacheConfig, WeightScale};
use common_util::partitioned_lock::PartitionedMutex;
use futures::stream::BoxStream;
use snafu::{OptionExt, Snafu};
use tokio::io::AsyncWrite;
Expand All @@ -37,54 +37,10 @@ impl WeightScale<String, Bytes> for CustomScale {
}
}

struct Partition {
inner: Mutex<CLruCache<String, Bytes, RandomState, CustomScale>>,
}

impl Partition {
fn new(mem_cap: NonZeroUsize) -> Self {
let cache = CLruCache::with_config(CLruCacheConfig::new(mem_cap).with_scale(CustomScale));

Self {
inner: Mutex::new(cache),
}
}
}

impl Partition {
fn get(&self, key: &str) -> Option<Bytes> {
let mut guard = self.inner.lock().unwrap();
guard.get(key).cloned()
}

fn peek(&self, key: &str) -> Option<Bytes> {
// FIXME: actually, here write lock is not necessary.
let guard = self.inner.lock().unwrap();
guard.peek(key).cloned()
}

fn insert(&self, key: String, value: Bytes) {
let mut guard = self.inner.lock().unwrap();
// don't care error now.
_ = guard.put_with_weight(key, value);
}

#[cfg(test)]
fn keys(&self) -> Vec<String> {
let guard = self.inner.lock().unwrap();
guard
.iter()
.map(|(key, _)| key)
.cloned()
.collect::<Vec<_>>()
}
}

pub struct MemCache {
/// Max memory this store can use
mem_cap: NonZeroUsize,
partitions: Vec<Arc<Partition>>,
partition_mask: usize,
inner: PartitionedMutex<CLruCache<String, Bytes, RandomState, CustomScale>>,
}

pub type MemCacheRef = Arc<MemCache>;
Expand All @@ -98,44 +54,38 @@ impl MemCache {
let cap_per_part = mem_cap
.checked_mul(NonZeroUsize::new(partition_num).unwrap())
.context(InvalidCapacity)?;
let partitions = (0..partition_num)
.map(|_| Arc::new(Partition::new(cap_per_part)))
.collect::<Vec<_>>();

Ok(Self {
mem_cap,
partitions,
partition_mask: partition_num - 1,
})
}

fn locate_partition(&self, key: &str) -> Arc<Partition> {
let mut hasher = DefaultHasher::new();
key.hash(&mut hasher);
self.partitions[hasher.finish() as usize & self.partition_mask].clone()
let inin_lru =
|| CLruCache::with_config(CLruCacheConfig::new(cap_per_part).with_scale(CustomScale));
let inner = PartitionedMutex::new(inin_lru, partition_bits);
Ok(Self { mem_cap, inner })
}

fn get(&self, key: &str) -> Option<Bytes> {
let partition = self.locate_partition(key);
partition.get(key)
self.inner.lock(&key).get(key).cloned()
}

fn peek(&self, key: &str) -> Option<Bytes> {
let partition = self.locate_partition(key);
partition.peek(key)
self.inner.lock(&key).peek(key).cloned()
}

fn insert(&self, key: String, value: Bytes) {
let partition = self.locate_partition(&key);
partition.insert(key, value);
// don't care error now.
_ = self.inner.lock(&key).put_with_weight(key, value);
}

/// Give a description of the cache state.

#[cfg(test)]
fn keys(&self, part: &CLruCache<String, Bytes, RandomState, CustomScale>) -> Vec<String> {
part.iter().map(|(key, _)| key).cloned().collect::<Vec<_>>()
}

#[cfg(test)]
fn state_desc(&self) -> String {
self.partitions
self.inner
.get_all_partition()
.iter()
.map(|part| part.keys().join(","))
.map(|part| self.keys(&part.lock().unwrap()).join(","))
.enumerate()
.map(|(part_no, keys)| format!("{part_no}: [{keys}]"))
.collect::<Vec<_>>()
Expand All @@ -147,8 +97,7 @@ impl Display for MemCache {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("MemCache")
.field("mem_cap", &self.mem_cap)
.field("mask", &self.partition_mask)
.field("partitions", &self.partitions.len())
.field("partitions", &self.inner.get_all_partition().len())
.finish()
}
}
Expand Down Expand Up @@ -375,10 +324,10 @@ mod test {
.unwrap();

assert_eq!(
r#"0: []
1: [partition.sst-100-105]
2: []
3: [partition.sst-0-5]"#,
r#"0: [partition.sst-0-5]
1: []
2: [partition.sst-100-105]
3: []"#,
store.cache.as_ref().state_desc()
);

Expand Down