Skip to content

Commit

Permalink
Improve performance on get_or_insert_with
Browse files Browse the repository at this point in the history
- Avoid to update a waiter in the concurrent hash table if it already presents.
- Overall refactoring in cht `bucket` and `bucket_array` modules.
  • Loading branch information
tatsuya6502 committed Feb 11, 2022
1 parent 2858fff commit 6875fed
Show file tree
Hide file tree
Showing 2 changed files with 109 additions and 67 deletions.
133 changes: 85 additions & 48 deletions src/cht/map/bucket.rs
Original file line number Diff line number Diff line change
Expand Up @@ -80,26 +80,25 @@ impl<'g, K: 'g + Eq, V: 'g> BucketArray<K, V> {
K: Borrow<Q>,
{
let loop_result = self.probe_loop(guard, hash, |_, _, this_bucket_ptr| {
let this_bucket_ref = if let Some(this_bucket_ref) = unsafe { this_bucket_ptr.as_ref() }
{
this_bucket_ref
let this_bucket_ref = if let Some(r) = unsafe { this_bucket_ptr.as_ref() } {
r
} else {
// Not found.
return ProbeLoopAction::Return(Shared::null());
};

let this_key = &this_bucket_ref.key;

if this_key.borrow() != key {
if this_bucket_ref.key.borrow() != key {
// Different key. Try next bucket
return ProbeLoopAction::Continue;
}

let result_ptr = if this_bucket_ptr.tag() & TOMBSTONE_TAG == 0 {
this_bucket_ptr
if is_tombstone(this_bucket_ptr) {
// Not found. (It has been removed)
ProbeLoopAction::Return(Shared::null())
} else {
Shared::null()
};

ProbeLoopAction::Return(result_ptr)
// Found.
ProbeLoopAction::Return(this_bucket_ptr)
}
});

match loop_result {
Expand All @@ -122,27 +121,34 @@ impl<'g, K: 'g + Eq, V: 'g> BucketArray<K, V> {
F: FnMut(&K, &V) -> bool,
{
let loop_result = self.probe_loop(guard, hash, |_, this_bucket, this_bucket_ptr| {
let this_bucket_ref = if let Some(this_bucket_ref) = unsafe { this_bucket_ptr.as_ref() }
{
this_bucket_ref
let this_bucket_ref = if let Some(r) = unsafe { this_bucket_ptr.as_ref() } {
r
} else {
// Nothing to remove.
return ProbeLoopAction::Return(Shared::null());
};

let this_key = &this_bucket_ref.key;

if this_key.borrow() != key {
// Different key. Try next bucket.
return ProbeLoopAction::Continue;
} else if this_bucket_ptr.tag() & TOMBSTONE_TAG != 0 {
}

if is_tombstone(this_bucket_ptr) {
// Already removed.
return ProbeLoopAction::Return(Shared::null());
}

let this_value = unsafe { &*this_bucket_ref.maybe_value.as_ptr() };

if !condition(this_key, this_value) {
// Found but the condition is false. Do not remove.
return ProbeLoopAction::Return(Shared::null());
}

// Found and the condition is true. Remove it. (Make it a tombstone)

let new_bucket_ptr = this_bucket_ptr.with_tag(TOMBSTONE_TAG);

match this_bucket.compare_and_set_weak(
Expand All @@ -151,7 +157,9 @@ impl<'g, K: 'g + Eq, V: 'g> BucketArray<K, V> {
(Ordering::Release, Ordering::Relaxed),
guard,
) {
// Succeeded. Return the removed value. (can be null)
Ok(_) => ProbeLoopAction::Return(new_bucket_ptr),
// Failed. Reload to retry.
Err(_) => ProbeLoopAction::Reload,
}
});
Expand All @@ -168,7 +176,7 @@ impl<'g, K: 'g + Eq, V: 'g> BucketArray<K, V> {
guard: &'g Guard,
hash: u64,
state: InsertOrModifyState<K, V, F>,
) -> Result<Shared<'g, Bucket<K, V>>, InsertOrModifyState<K, V, F>>
) -> Result<InsertionResult<'g, K, V>, InsertOrModifyState<K, V, F>>
where
F: FnOnce() -> V,
{
Expand All @@ -177,23 +185,24 @@ impl<'g, K: 'g + Eq, V: 'g> BucketArray<K, V> {
let loop_result = self.probe_loop(guard, hash, |_, this_bucket, this_bucket_ptr| {
let state = maybe_state.take().unwrap();

let new_bucket = {
if let Some(this_bucket_ref) = unsafe { this_bucket_ptr.as_ref() } {
let this_key = &this_bucket_ref.key;

if this_key.borrow() != state.key() {
maybe_state = Some(state);

return ProbeLoopAction::Continue;
}
if let Some(this_bucket_ref) = unsafe { this_bucket_ptr.as_ref() } {
if this_bucket_ref.key.borrow() != state.key() {
// Different key. Try next bucket.
maybe_state = Some(state);
return ProbeLoopAction::Continue;
}

if this_bucket_ptr.tag() & TOMBSTONE_TAG == 0 {
return ProbeLoopAction::Return(this_bucket_ptr);
}
if !is_tombstone(this_bucket_ptr) {
// Found. Return it.
return ProbeLoopAction::Return(InsertionResult::AlreadyPresent(
this_bucket_ptr,
));
}
}

state.into_insert_bucket()
};
// Not found or found a tombstone. Insert it.

let new_bucket = state.into_insert_bucket();

if let Err(CompareAndSetError { new, .. }) = this_bucket.compare_and_set_weak(
this_bucket_ptr,
Expand All @@ -202,10 +211,13 @@ impl<'g, K: 'g + Eq, V: 'g> BucketArray<K, V> {
guard,
) {
maybe_state = Some(InsertOrModifyState::from_bucket_value(new, None));

ProbeLoopAction::Reload
} else if unsafe { this_bucket_ptr.as_ref() }.is_some() {
// Inserted by replacing a tombstone.
ProbeLoopAction::Return(InsertionResult::ReplacedTombstone(this_bucket_ptr))
} else {
ProbeLoopAction::Return(this_bucket_ptr)
// Inserted.
ProbeLoopAction::Return(InsertionResult::Inserted)
}
});

Expand Down Expand Up @@ -235,22 +247,25 @@ impl<'g, K: 'g + Eq, V: 'g> BucketArray<K, V> {
let this_key = &this_bucket_ref.key;

if this_key != state.key() {
// Different key. Try next bucket.
maybe_state = Some(state);

return ProbeLoopAction::Continue;
}

if this_bucket_ptr.tag() & TOMBSTONE_TAG == 0 {
if is_tombstone(this_bucket_ptr) {
// Found a tombstone for this key. Replace it.
(state.into_insert_bucket(), None)
} else {
// Found. Modify it.
let this_value = unsafe { &*this_bucket_ref.maybe_value.as_ptr() };
let new_value = modifier(this_key, this_value);

let (new_bucket, insert_value) = state.into_modify_bucket(new_value);

(new_bucket, Some(insert_value))
} else {
(state.into_insert_bucket(), None)
}
} else {
// Not found. Insert it.
(state.into_insert_bucket(), None)
};

Expand All @@ -260,13 +275,14 @@ impl<'g, K: 'g + Eq, V: 'g> BucketArray<K, V> {
(Ordering::Release, Ordering::Relaxed),
guard,
) {
// Failed. Reload to retry.
maybe_state = Some(InsertOrModifyState::from_bucket_value(
new,
maybe_insert_value,
));

ProbeLoopAction::Reload
} else {
// Succeeded. Return the previous value. (can be null)
ProbeLoopAction::Return(this_bucket_ptr)
}
});
Expand All @@ -283,8 +299,8 @@ impl<'g, K: 'g + Eq, V: 'g> BucketArray<K, V> {
bucket_ptr: Shared<'g, Bucket<K, V>>,
) -> Option<usize> {
assert!(!bucket_ptr.is_null());
assert_eq!(bucket_ptr.tag() & SENTINEL_TAG, 0);
assert_ne!(bucket_ptr.tag() & BORROWED_TAG, 0);
assert!(!is_sentinel(bucket_ptr));
assert!(is_borrowed(bucket_ptr));

let key = &unsafe { bucket_ptr.deref() }.key;

Expand All @@ -294,12 +310,12 @@ impl<'g, K: 'g + Eq, V: 'g> BucketArray<K, V> {
return ProbeLoopAction::Return(None);
} else if this_key != key {
return ProbeLoopAction::Continue;
} else if this_bucket_ptr.tag() & BORROWED_TAG == 0 {
} else if !is_borrowed(this_bucket_ptr) {
return ProbeLoopAction::Return(None);
}
}

if this_bucket_ptr.is_null() && bucket_ptr.tag() & TOMBSTONE_TAG != 0 {
if this_bucket_ptr.is_null() && is_tombstone(bucket_ptr) {
ProbeLoopAction::Return(None)
} else if this_bucket
.compare_and_set_weak(
Expand Down Expand Up @@ -335,7 +351,7 @@ impl<'g, K: 'g, V: 'g> BucketArray<K, V> {
loop {
let this_bucket_ptr = this_bucket.load_consume(guard);

if this_bucket_ptr.tag() & SENTINEL_TAG != 0 {
if is_sentinel(this_bucket_ptr) {
return ProbeLoopResult::FoundSentinelTag;
}

Expand Down Expand Up @@ -368,7 +384,7 @@ impl<'g, K: 'g, V: 'g> BucketArray<K, V> {
loop {
let this_bucket_ptr = this_bucket.load_consume(guard);

if this_bucket_ptr.tag() & SENTINEL_TAG != 0 {
if is_sentinel(this_bucket_ptr) {
break;
}

Expand All @@ -379,7 +395,7 @@ impl<'g, K: 'g, V: 'g> BucketArray<K, V> {

let next_bucket = &next_array.buckets[index];

while next_bucket_ptr.tag() & BORROWED_TAG != 0
while is_borrowed(next_bucket_ptr)
&& next_bucket
.compare_and_set_weak(
next_bucket_ptr,
Expand Down Expand Up @@ -411,7 +427,7 @@ impl<'g, K: 'g, V: 'g> BucketArray<K, V> {
{
// TODO: If else, we may need to count tombstone.
if !this_bucket_ptr.is_null()
&& this_bucket_ptr.tag() & TOMBSTONE_TAG != 0
&& is_tombstone(this_bucket_ptr)
&& maybe_state.is_none()
{
unsafe { defer_destroy_bucket(guard, this_bucket_ptr) };
Expand Down Expand Up @@ -599,6 +615,12 @@ impl<T> ProbeLoopResult<T> {
}
}

pub(crate) enum InsertionResult<'g, K, V> {
AlreadyPresent(Shared<'g, Bucket<K, V>>),
Inserted,
ReplacedTombstone(Shared<'g, Bucket<K, V>>),
}

pub(crate) unsafe fn defer_destroy_bucket<'g, K, V>(
guard: &'g Guard,
mut ptr: Shared<'g, Bucket<K, V>>,
Expand All @@ -608,7 +630,7 @@ pub(crate) unsafe fn defer_destroy_bucket<'g, K, V>(
guard.defer_unchecked(move || {
atomic::fence(Ordering::Acquire);

if ptr.tag() & TOMBSTONE_TAG == 0 {
if !is_tombstone(ptr) {
ptr::drop_in_place(ptr.deref_mut().maybe_value.as_mut_ptr());
}

Expand All @@ -621,7 +643,7 @@ pub(crate) unsafe fn defer_destroy_tombstone<'g, K, V>(
mut ptr: Shared<'g, Bucket<K, V>>,
) {
assert!(!ptr.is_null());
assert_ne!(ptr.tag() & TOMBSTONE_TAG, 0);
assert!(is_tombstone(ptr));

atomic::fence(Ordering::Acquire);
// read the value now, but defer its destruction for later
Expand Down Expand Up @@ -690,6 +712,21 @@ pub(crate) const SENTINEL_TAG: usize = 0b001; // set on old table buckets when c
pub(crate) const TOMBSTONE_TAG: usize = 0b010; // set when the value has been destroyed
pub(crate) const BORROWED_TAG: usize = 0b100; // set on new table buckets when copied from an old table

#[inline]
pub(crate) fn is_sentinel<K, V>(bucket_ptr: Shared<'_, Bucket<K, V>>) -> bool {
bucket_ptr.tag() & SENTINEL_TAG != 0
}

#[inline]
pub(crate) fn is_tombstone<K, V>(bucket_ptr: Shared<'_, Bucket<K, V>>) -> bool {
bucket_ptr.tag() & TOMBSTONE_TAG != 0
}

#[inline]
pub(crate) fn is_borrowed<K, V>(bucket_ptr: Shared<'_, Bucket<K, V>>) -> bool {
bucket_ptr.tag() & BORROWED_TAG != 0
}

// #[cfg(test)]
// mod tests {
// use super::*;
Expand Down
43 changes: 24 additions & 19 deletions src/cht/map/bucket_array_ref.rs
Original file line number Diff line number Diff line change
Expand Up @@ -131,12 +131,14 @@ where
key: K,
hash: u64,
on_insert: F,
with_previous_entry: G,
with_existing_entry: G,
) -> Option<T>
where
F: FnOnce() -> V,
G: FnOnce(&K, &V) -> T,
{
use bucket::InsertionResult;

let guard = &crossbeam_epoch::pin();
let current_ref = self.get(guard);
let mut bucket_array_ref = current_ref;
Expand All @@ -158,23 +160,26 @@ where
}

match bucket_array_ref.insert_if_not_present(guard, hash, state) {
Ok(previous_bucket_ptr) => {
if let Some(previous_bucket_ref) = unsafe { previous_bucket_ptr.as_ref() } {
if previous_bucket_ptr.tag() & bucket::TOMBSTONE_TAG != 0 {
self.len.fetch_add(1, Ordering::Relaxed);
result = None;
} else {
let Bucket {
key,
maybe_value: value,
} = previous_bucket_ref;
result = Some(with_previous_entry(key, unsafe { &*value.as_ptr() }));
}
} else {
self.len.fetch_add(1, Ordering::Relaxed);
result = None;
}

Ok(InsertionResult::AlreadyPresent(current_bucket_ptr)) => {
let current_bucket_ref = unsafe { current_bucket_ptr.as_ref() }.unwrap();
assert!(!bucket::is_tombstone(current_bucket_ptr));
let Bucket {
key,
maybe_value: value,
} = current_bucket_ref;
result = Some(with_existing_entry(key, unsafe { &*value.as_ptr() }));
break;
}
Ok(InsertionResult::Inserted) => {
self.len.fetch_add(1, Ordering::Relaxed);
result = None;
break;
}
Ok(InsertionResult::ReplacedTombstone(previous_bucket_ptr)) => {
assert!(bucket::is_tombstone(previous_bucket_ptr));
self.len.fetch_add(1, Ordering::Relaxed);
unsafe { bucket::defer_destroy_bucket(guard, previous_bucket_ptr) };
result = None;
break;
}
Err(s) => {
Expand Down Expand Up @@ -226,7 +231,7 @@ where
match bucket_array_ref.insert_or_modify(guard, hash, state, on_modify) {
Ok(previous_bucket_ptr) => {
if let Some(previous_bucket_ref) = unsafe { previous_bucket_ptr.as_ref() } {
if previous_bucket_ptr.tag() & bucket::TOMBSTONE_TAG != 0 {
if bucket::is_tombstone(previous_bucket_ptr) {
self.len.fetch_add(1, Ordering::Relaxed);
result = None;
} else {
Expand Down

0 comments on commit 6875fed

Please sign in to comment.