Skip to content

Commit

Permalink
Improve performance on get_or_insert_with
Browse files Browse the repository at this point in the history
Avoid to update a waiter in the concurrent hash table if it already presents.
  • Loading branch information
tatsuya6502 committed Feb 8, 2022
1 parent 8896c33 commit 2858fff
Show file tree
Hide file tree
Showing 5 changed files with 196 additions and 41 deletions.
84 changes: 70 additions & 14 deletions src/cht/map/bucket.rs
Original file line number Diff line number Diff line change
Expand Up @@ -69,13 +69,14 @@ impl<K, V> Drop for BucketArray<K, V> {
}

impl<'g, K: 'g + Eq, V: 'g> BucketArray<K, V> {
pub(crate) fn get<Q: ?Sized + Eq>(
pub(crate) fn get<Q>(
&self,
guard: &'g Guard,
hash: u64,
key: &Q,
) -> Result<Shared<'g, Bucket<K, V>>, RelocatedError>
where
Q: Eq + ?Sized,
K: Borrow<Q>,
{
let loop_result = self.probe_loop(guard, hash, |_, _, this_bucket_ptr| {
Expand Down Expand Up @@ -108,15 +109,17 @@ impl<'g, K: 'g + Eq, V: 'g> BucketArray<K, V> {
}
}

pub(crate) fn remove_if<Q: ?Sized + Eq, F: FnMut(&K, &V) -> bool>(
pub(crate) fn remove_if<Q, F>(
&self,
guard: &'g Guard,
hash: u64,
key: &Q,
mut condition: F,
) -> Result<Shared<'g, Bucket<K, V>>, F>
where
Q: Eq + ?Sized,
K: Borrow<Q>,
F: FnMut(&K, &V) -> bool,
{
let loop_result = self.probe_loop(guard, hash, |_, this_bucket, this_bucket_ptr| {
let this_bucket_ref = if let Some(this_bucket_ref) = unsafe { this_bucket_ptr.as_ref() }
Expand Down Expand Up @@ -160,15 +163,68 @@ impl<'g, K: 'g + Eq, V: 'g> BucketArray<K, V> {
}
}

pub(crate) fn insert_if_not_present<F>(
&self,
guard: &'g Guard,
hash: u64,
state: InsertOrModifyState<K, V, F>,
) -> Result<Shared<'g, Bucket<K, V>>, InsertOrModifyState<K, V, F>>
where
F: FnOnce() -> V,
{
let mut maybe_state = Some(state);

let loop_result = self.probe_loop(guard, hash, |_, this_bucket, this_bucket_ptr| {
let state = maybe_state.take().unwrap();

let new_bucket = {
if let Some(this_bucket_ref) = unsafe { this_bucket_ptr.as_ref() } {
let this_key = &this_bucket_ref.key;

if this_key.borrow() != state.key() {
maybe_state = Some(state);

return ProbeLoopAction::Continue;
}

if this_bucket_ptr.tag() & TOMBSTONE_TAG == 0 {
return ProbeLoopAction::Return(this_bucket_ptr);
}
}

state.into_insert_bucket()
};

if let Err(CompareAndSetError { new, .. }) = this_bucket.compare_and_set_weak(
this_bucket_ptr,
new_bucket,
(Ordering::Release, Ordering::Relaxed),
guard,
) {
maybe_state = Some(InsertOrModifyState::from_bucket_value(new, None));

ProbeLoopAction::Reload
} else {
ProbeLoopAction::Return(this_bucket_ptr)
}
});

loop_result.returned().ok_or_else(|| maybe_state.unwrap())
}

// https://rust-lang.github.io/rust-clippy/master/index.html#type_complexity
#[allow(clippy::type_complexity)]
pub(crate) fn insert_or_modify<F: FnOnce() -> V, G: FnMut(&K, &V) -> V>(
pub(crate) fn insert_or_modify<F, G>(
&self,
guard: &'g Guard,
hash: u64,
state: InsertOrModifyState<K, V, F>,
mut modifier: G,
) -> Result<Shared<'g, Bucket<K, V>>, (InsertOrModifyState<K, V, F>, G)> {
) -> Result<Shared<'g, Bucket<K, V>>, (InsertOrModifyState<K, V, F>, G)>
where
F: FnOnce() -> V,
G: FnMut(&K, &V) -> V,
{
let mut maybe_state = Some(state);

let loop_result = self.probe_loop(guard, hash, |_, this_bucket, this_bucket_ptr| {
Expand Down Expand Up @@ -265,15 +321,10 @@ impl<'g, K: 'g + Eq, V: 'g> BucketArray<K, V> {
}

impl<'g, K: 'g, V: 'g> BucketArray<K, V> {
fn probe_loop<
fn probe_loop<F, T>(&self, guard: &'g Guard, hash: u64, mut f: F) -> ProbeLoopResult<T>
where
F: FnMut(usize, &Atomic<Bucket<K, V>>, Shared<'g, Bucket<K, V>>) -> ProbeLoopAction<T>,
T,
>(
&self,
guard: &'g Guard,
hash: u64,
mut f: F,
) -> ProbeLoopResult<T> {
{
let offset = hash as usize & (self.buckets.len() - 1);

for i in
Expand All @@ -299,14 +350,15 @@ impl<'g, K: 'g, V: 'g> BucketArray<K, V> {
ProbeLoopResult::LoopEnded
}

pub(crate) fn rehash<H: BuildHasher>(
pub(crate) fn rehash<H>(
&self,
guard: &'g Guard,
build_hasher: &H,
rehash_op: RehashOp,
) -> &'g BucketArray<K, V>
where
K: Hash + Eq,
H: BuildHasher,
{
let next_array = self.next_array(guard, rehash_op);

Expand Down Expand Up @@ -515,7 +567,11 @@ impl<V, F: FnOnce() -> V> ValueOrFunction<V, F> {
}
}

pub(crate) fn hash<K: ?Sized + Hash, H: BuildHasher>(build_hasher: &H, key: &K) -> u64 {
pub(crate) fn hash<K, H>(build_hasher: &H, key: &K) -> u64
where
K: ?Sized + Hash,
H: BuildHasher,
{
let mut hasher = build_hasher.build_hasher();
key.hash(&mut hasher);

Expand Down
101 changes: 82 additions & 19 deletions src/cht/map/bucket_array_ref.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,15 +14,16 @@ pub(crate) struct BucketArrayRef<'a, K, V, S> {
pub(crate) len: &'a AtomicUsize,
}

impl<'a, K: Hash + Eq, V, S: BuildHasher> BucketArrayRef<'a, K, V, S> {
pub(crate) fn get_key_value_and<Q: Hash + Eq + ?Sized, F: FnOnce(&K, &V) -> T, T>(
&self,
key: &Q,
hash: u64,
with_entry: F,
) -> Option<T>
impl<'a, K, V, S> BucketArrayRef<'a, K, V, S>
where
K: Hash + Eq,
S: BuildHasher,
{
pub(crate) fn get_key_value_and<Q, F, T>(&self, key: &Q, hash: u64, with_entry: F) -> Option<T>
where
Q: Hash + Eq + ?Sized,
K: Borrow<Q>,
F: FnOnce(&K, &V) -> T,
{
let guard = &crossbeam_epoch::pin();
let current_ref = self.get(guard);
Expand Down Expand Up @@ -60,20 +61,18 @@ impl<'a, K: Hash + Eq, V, S: BuildHasher> BucketArrayRef<'a, K, V, S> {
result
}

pub(crate) fn remove_entry_if_and<
Q: Hash + Eq + ?Sized,
F: FnMut(&K, &V) -> bool,
G: FnOnce(&K, &V) -> T,
T,
>(
pub(crate) fn remove_entry_if_and<Q, F, G, T>(
&self,
key: &Q,
hash: u64,
mut condition: F,
with_previous_entry: G,
) -> Option<T>
where
Q: Hash + Eq + ?Sized,
K: Borrow<Q>,
F: FnMut(&K, &V) -> bool,
G: FnOnce(&K, &V) -> T,
{
let guard = &crossbeam_epoch::pin();
let current_ref = self.get(guard);
Expand Down Expand Up @@ -127,19 +126,83 @@ impl<'a, K: Hash + Eq, V, S: BuildHasher> BucketArrayRef<'a, K, V, S> {
result
}

pub(crate) fn insert_with_or_modify_entry_and<
pub(crate) fn insert_if_not_present_and<F, G, T>(
&self,
key: K,
hash: u64,
on_insert: F,
with_previous_entry: G,
) -> Option<T>
where
F: FnOnce() -> V,
G: FnMut(&K, &V) -> V,
H: FnOnce(&K, &V) -> T,
T,
>(
G: FnOnce(&K, &V) -> T,
{
let guard = &crossbeam_epoch::pin();
let current_ref = self.get(guard);
let mut bucket_array_ref = current_ref;
let mut state = InsertOrModifyState::New(key, on_insert);

let result;

loop {
loop {
let rehash_op = RehashOp::new(
bucket_array_ref.capacity(),
&bucket_array_ref.tombstone_count,
self.len,
);
if rehash_op.is_skip() {
break;
}
bucket_array_ref = bucket_array_ref.rehash(guard, self.build_hasher, rehash_op);
}

match bucket_array_ref.insert_if_not_present(guard, hash, state) {
Ok(previous_bucket_ptr) => {
if let Some(previous_bucket_ref) = unsafe { previous_bucket_ptr.as_ref() } {
if previous_bucket_ptr.tag() & bucket::TOMBSTONE_TAG != 0 {
self.len.fetch_add(1, Ordering::Relaxed);
result = None;
} else {
let Bucket {
key,
maybe_value: value,
} = previous_bucket_ref;
result = Some(with_previous_entry(key, unsafe { &*value.as_ptr() }));
}
} else {
self.len.fetch_add(1, Ordering::Relaxed);
result = None;
}

break;
}
Err(s) => {
state = s;
bucket_array_ref =
bucket_array_ref.rehash(guard, self.build_hasher, RehashOp::Expand);
}
}
}

self.swing(guard, current_ref, bucket_array_ref);

result
}

pub(crate) fn insert_with_or_modify_entry_and<T, F, G, H>(
&self,
key: K,
hash: u64,
on_insert: F,
mut on_modify: G,
with_old_entry: H,
) -> Option<T> {
) -> Option<T>
where
F: FnOnce() -> V,
G: FnMut(&K, &V) -> V,
H: FnOnce(&K, &V) -> T,
{
let guard = &crossbeam_epoch::pin();
let current_ref = self.get(guard);
let mut bucket_array_ref = current_ref;
Expand Down
44 changes: 42 additions & 2 deletions src/cht/segment/map.rs
Original file line number Diff line number Diff line change
Expand Up @@ -420,8 +420,6 @@ impl<K: Hash + Eq, V, S: BuildHasher> HashMap<K, V, S> {
///
/// [`Some`]: https://doc.rust-lang.org/std/option/enum.Option.html#variant.Some
/// [`None`]: https://doc.rust-lang.org/std/option/enum.Option.html#variant.None
///
/// Moka
#[inline]
pub(crate) fn insert_with_or_modify<F, G>(
&self,
Expand Down Expand Up @@ -478,6 +476,27 @@ impl<K: Hash + Eq, V, S: BuildHasher> HashMap<K, V, S> {

result
}

#[inline]
pub(crate) fn insert_if_not_present(&self, key: K, value: V) -> Option<V>
where
V: Clone,
{
let hash = bucket::hash(&self.build_hasher, &key);

let result = self.bucket_array_ref(hash).insert_if_not_present_and(
key,
hash,
|| value,
|_, v| v.clone(),
);

if result.is_none() {
self.len.fetch_add(1, Ordering::Relaxed);
}

result
}
}

impl<K, V, S> Drop for HashMap<K, V, S> {
Expand Down Expand Up @@ -583,4 +602,25 @@ mod tests {
assert!(map.is_empty());
assert_eq!(map.len(), 0);
}

#[test]
fn insert_if_not_present() {
let map =
HashMap::with_num_segments_capacity_and_hasher(1, 0, DefaultHashBuilder::default());

assert_eq!(map.insert_if_not_present("foo", 5), None);
assert_eq!(map.get("foo"), Some(5));

assert_eq!(map.insert_if_not_present("foo", 6), Some(5));
assert_eq!(map.get("foo"), Some(5));

assert_eq!(map.remove("foo"), Some(5));

assert_eq!(map.insert_if_not_present("foo", 7), None);
assert_eq!(map.get("foo"), Some(7));

assert_eq!(map.remove("foo"), Some(7));
assert!(map.is_empty());
assert_eq!(map.len(), 0);
}
}
4 changes: 1 addition & 3 deletions src/future/value_initializer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -248,9 +248,7 @@ where
) -> Option<Waiter<V>> {
let key = Arc::clone(key);
let waiter = TrioArc::clone(waiter);

self.waiters
.insert_with_or_modify((key, type_id), || waiter, |_, w| TrioArc::clone(w))
self.waiters.insert_if_not_present((key, type_id), waiter)
}
}

Expand Down
4 changes: 1 addition & 3 deletions src/sync/value_initializer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -159,8 +159,6 @@ where
) -> Option<Waiter<V>> {
let key = Arc::clone(key);
let waiter = TrioArc::clone(waiter);

self.waiters
.insert_with_or_modify((key, type_id), || waiter, |_, w| TrioArc::clone(w))
self.waiters.insert_if_not_present((key, type_id), waiter)
}
}

0 comments on commit 2858fff

Please sign in to comment.