Skip to content

Commit

Permalink
Merge pull request #88 from moka-rs/efficient-atomic-insert
Browse files Browse the repository at this point in the history
Improve performance on `get_or_insert_with`
  • Loading branch information
tatsuya6502 authored Feb 11, 2022
2 parents 89736fa + 6875fed commit 8bd97fb
Show file tree
Hide file tree
Showing 5 changed files with 270 additions and 73 deletions.
183 changes: 138 additions & 45 deletions src/cht/map/bucket.rs
Original file line number Diff line number Diff line change
Expand Up @@ -69,36 +69,36 @@ impl<K, V> Drop for BucketArray<K, V> {
}

impl<'g, K: 'g + Eq, V: 'g> BucketArray<K, V> {
pub(crate) fn get<Q: ?Sized + Eq>(
pub(crate) fn get<Q>(
&self,
guard: &'g Guard,
hash: u64,
key: &Q,
) -> Result<Shared<'g, Bucket<K, V>>, RelocatedError>
where
Q: Eq + ?Sized,
K: Borrow<Q>,
{
let loop_result = self.probe_loop(guard, hash, |_, _, this_bucket_ptr| {
let this_bucket_ref = if let Some(this_bucket_ref) = unsafe { this_bucket_ptr.as_ref() }
{
this_bucket_ref
let this_bucket_ref = if let Some(r) = unsafe { this_bucket_ptr.as_ref() } {
r
} else {
// Not found.
return ProbeLoopAction::Return(Shared::null());
};

let this_key = &this_bucket_ref.key;

if this_key.borrow() != key {
if this_bucket_ref.key.borrow() != key {
// Different key. Try next bucket
return ProbeLoopAction::Continue;
}

let result_ptr = if this_bucket_ptr.tag() & TOMBSTONE_TAG == 0 {
this_bucket_ptr
if is_tombstone(this_bucket_ptr) {
// Not found. (It has been removed)
ProbeLoopAction::Return(Shared::null())
} else {
Shared::null()
};

ProbeLoopAction::Return(result_ptr)
// Found.
ProbeLoopAction::Return(this_bucket_ptr)
}
});

match loop_result {
Expand All @@ -108,38 +108,47 @@ impl<'g, K: 'g + Eq, V: 'g> BucketArray<K, V> {
}
}

pub(crate) fn remove_if<Q: ?Sized + Eq, F: FnMut(&K, &V) -> bool>(
pub(crate) fn remove_if<Q, F>(
&self,
guard: &'g Guard,
hash: u64,
key: &Q,
mut condition: F,
) -> Result<Shared<'g, Bucket<K, V>>, F>
where
Q: Eq + ?Sized,
K: Borrow<Q>,
F: FnMut(&K, &V) -> bool,
{
let loop_result = self.probe_loop(guard, hash, |_, this_bucket, this_bucket_ptr| {
let this_bucket_ref = if let Some(this_bucket_ref) = unsafe { this_bucket_ptr.as_ref() }
{
this_bucket_ref
let this_bucket_ref = if let Some(r) = unsafe { this_bucket_ptr.as_ref() } {
r
} else {
// Nothing to remove.
return ProbeLoopAction::Return(Shared::null());
};

let this_key = &this_bucket_ref.key;

if this_key.borrow() != key {
// Different key. Try next bucket.
return ProbeLoopAction::Continue;
} else if this_bucket_ptr.tag() & TOMBSTONE_TAG != 0 {
}

if is_tombstone(this_bucket_ptr) {
// Already removed.
return ProbeLoopAction::Return(Shared::null());
}

let this_value = unsafe { &*this_bucket_ref.maybe_value.as_ptr() };

if !condition(this_key, this_value) {
// Found but the condition is false. Do not remove.
return ProbeLoopAction::Return(Shared::null());
}

// Found and the condition is true. Remove it. (Make it a tombstone)

let new_bucket_ptr = this_bucket_ptr.with_tag(TOMBSTONE_TAG);

match this_bucket.compare_and_set_weak(
Expand All @@ -148,7 +157,9 @@ impl<'g, K: 'g + Eq, V: 'g> BucketArray<K, V> {
(Ordering::Release, Ordering::Relaxed),
guard,
) {
// Succeeded. Return the removed value. (can be null)
Ok(_) => ProbeLoopAction::Return(new_bucket_ptr),
// Failed. Reload to retry.
Err(_) => ProbeLoopAction::Reload,
}
});
Expand All @@ -160,15 +171,72 @@ impl<'g, K: 'g + Eq, V: 'g> BucketArray<K, V> {
}
}

pub(crate) fn insert_if_not_present<F>(
&self,
guard: &'g Guard,
hash: u64,
state: InsertOrModifyState<K, V, F>,
) -> Result<InsertionResult<'g, K, V>, InsertOrModifyState<K, V, F>>
where
F: FnOnce() -> V,
{
let mut maybe_state = Some(state);

let loop_result = self.probe_loop(guard, hash, |_, this_bucket, this_bucket_ptr| {
let state = maybe_state.take().unwrap();

if let Some(this_bucket_ref) = unsafe { this_bucket_ptr.as_ref() } {
if this_bucket_ref.key.borrow() != state.key() {
// Different key. Try next bucket.
maybe_state = Some(state);
return ProbeLoopAction::Continue;
}

if !is_tombstone(this_bucket_ptr) {
// Found. Return it.
return ProbeLoopAction::Return(InsertionResult::AlreadyPresent(
this_bucket_ptr,
));
}
}

// Not found or found a tombstone. Insert it.

let new_bucket = state.into_insert_bucket();

if let Err(CompareAndSetError { new, .. }) = this_bucket.compare_and_set_weak(
this_bucket_ptr,
new_bucket,
(Ordering::Release, Ordering::Relaxed),
guard,
) {
maybe_state = Some(InsertOrModifyState::from_bucket_value(new, None));
ProbeLoopAction::Reload
} else if unsafe { this_bucket_ptr.as_ref() }.is_some() {
// Inserted by replacing a tombstone.
ProbeLoopAction::Return(InsertionResult::ReplacedTombstone(this_bucket_ptr))
} else {
// Inserted.
ProbeLoopAction::Return(InsertionResult::Inserted)
}
});

loop_result.returned().ok_or_else(|| maybe_state.unwrap())
}

// https://rust-lang.github.io/rust-clippy/master/index.html#type_complexity
#[allow(clippy::type_complexity)]
pub(crate) fn insert_or_modify<F: FnOnce() -> V, G: FnMut(&K, &V) -> V>(
pub(crate) fn insert_or_modify<F, G>(
&self,
guard: &'g Guard,
hash: u64,
state: InsertOrModifyState<K, V, F>,
mut modifier: G,
) -> Result<Shared<'g, Bucket<K, V>>, (InsertOrModifyState<K, V, F>, G)> {
) -> Result<Shared<'g, Bucket<K, V>>, (InsertOrModifyState<K, V, F>, G)>
where
F: FnOnce() -> V,
G: FnMut(&K, &V) -> V,
{
let mut maybe_state = Some(state);

let loop_result = self.probe_loop(guard, hash, |_, this_bucket, this_bucket_ptr| {
Expand All @@ -179,22 +247,25 @@ impl<'g, K: 'g + Eq, V: 'g> BucketArray<K, V> {
let this_key = &this_bucket_ref.key;

if this_key != state.key() {
// Different key. Try next bucket.
maybe_state = Some(state);

return ProbeLoopAction::Continue;
}

if this_bucket_ptr.tag() & TOMBSTONE_TAG == 0 {
if is_tombstone(this_bucket_ptr) {
// Found a tombstone for this key. Replace it.
(state.into_insert_bucket(), None)
} else {
// Found. Modify it.
let this_value = unsafe { &*this_bucket_ref.maybe_value.as_ptr() };
let new_value = modifier(this_key, this_value);

let (new_bucket, insert_value) = state.into_modify_bucket(new_value);

(new_bucket, Some(insert_value))
} else {
(state.into_insert_bucket(), None)
}
} else {
// Not found. Insert it.
(state.into_insert_bucket(), None)
};

Expand All @@ -204,13 +275,14 @@ impl<'g, K: 'g + Eq, V: 'g> BucketArray<K, V> {
(Ordering::Release, Ordering::Relaxed),
guard,
) {
// Failed. Reload to retry.
maybe_state = Some(InsertOrModifyState::from_bucket_value(
new,
maybe_insert_value,
));

ProbeLoopAction::Reload
} else {
// Succeeded. Return the previous value. (can be null)
ProbeLoopAction::Return(this_bucket_ptr)
}
});
Expand All @@ -227,8 +299,8 @@ impl<'g, K: 'g + Eq, V: 'g> BucketArray<K, V> {
bucket_ptr: Shared<'g, Bucket<K, V>>,
) -> Option<usize> {
assert!(!bucket_ptr.is_null());
assert_eq!(bucket_ptr.tag() & SENTINEL_TAG, 0);
assert_ne!(bucket_ptr.tag() & BORROWED_TAG, 0);
assert!(!is_sentinel(bucket_ptr));
assert!(is_borrowed(bucket_ptr));

let key = &unsafe { bucket_ptr.deref() }.key;

Expand All @@ -238,12 +310,12 @@ impl<'g, K: 'g + Eq, V: 'g> BucketArray<K, V> {
return ProbeLoopAction::Return(None);
} else if this_key != key {
return ProbeLoopAction::Continue;
} else if this_bucket_ptr.tag() & BORROWED_TAG == 0 {
} else if !is_borrowed(this_bucket_ptr) {
return ProbeLoopAction::Return(None);
}
}

if this_bucket_ptr.is_null() && bucket_ptr.tag() & TOMBSTONE_TAG != 0 {
if this_bucket_ptr.is_null() && is_tombstone(bucket_ptr) {
ProbeLoopAction::Return(None)
} else if this_bucket
.compare_and_set_weak(
Expand All @@ -265,15 +337,10 @@ impl<'g, K: 'g + Eq, V: 'g> BucketArray<K, V> {
}

impl<'g, K: 'g, V: 'g> BucketArray<K, V> {
fn probe_loop<
fn probe_loop<F, T>(&self, guard: &'g Guard, hash: u64, mut f: F) -> ProbeLoopResult<T>
where
F: FnMut(usize, &Atomic<Bucket<K, V>>, Shared<'g, Bucket<K, V>>) -> ProbeLoopAction<T>,
T,
>(
&self,
guard: &'g Guard,
hash: u64,
mut f: F,
) -> ProbeLoopResult<T> {
{
let offset = hash as usize & (self.buckets.len() - 1);

for i in
Expand All @@ -284,7 +351,7 @@ impl<'g, K: 'g, V: 'g> BucketArray<K, V> {
loop {
let this_bucket_ptr = this_bucket.load_consume(guard);

if this_bucket_ptr.tag() & SENTINEL_TAG != 0 {
if is_sentinel(this_bucket_ptr) {
return ProbeLoopResult::FoundSentinelTag;
}

Expand All @@ -299,14 +366,15 @@ impl<'g, K: 'g, V: 'g> BucketArray<K, V> {
ProbeLoopResult::LoopEnded
}

pub(crate) fn rehash<H: BuildHasher>(
pub(crate) fn rehash<H>(
&self,
guard: &'g Guard,
build_hasher: &H,
rehash_op: RehashOp,
) -> &'g BucketArray<K, V>
where
K: Hash + Eq,
H: BuildHasher,
{
let next_array = self.next_array(guard, rehash_op);

Expand All @@ -316,7 +384,7 @@ impl<'g, K: 'g, V: 'g> BucketArray<K, V> {
loop {
let this_bucket_ptr = this_bucket.load_consume(guard);

if this_bucket_ptr.tag() & SENTINEL_TAG != 0 {
if is_sentinel(this_bucket_ptr) {
break;
}

Expand All @@ -327,7 +395,7 @@ impl<'g, K: 'g, V: 'g> BucketArray<K, V> {

let next_bucket = &next_array.buckets[index];

while next_bucket_ptr.tag() & BORROWED_TAG != 0
while is_borrowed(next_bucket_ptr)
&& next_bucket
.compare_and_set_weak(
next_bucket_ptr,
Expand Down Expand Up @@ -359,7 +427,7 @@ impl<'g, K: 'g, V: 'g> BucketArray<K, V> {
{
// TODO: If else, we may need to count tombstone.
if !this_bucket_ptr.is_null()
&& this_bucket_ptr.tag() & TOMBSTONE_TAG != 0
&& is_tombstone(this_bucket_ptr)
&& maybe_state.is_none()
{
unsafe { defer_destroy_bucket(guard, this_bucket_ptr) };
Expand Down Expand Up @@ -515,7 +583,11 @@ impl<V, F: FnOnce() -> V> ValueOrFunction<V, F> {
}
}

pub(crate) fn hash<K: ?Sized + Hash, H: BuildHasher>(build_hasher: &H, key: &K) -> u64 {
pub(crate) fn hash<K, H>(build_hasher: &H, key: &K) -> u64
where
K: ?Sized + Hash,
H: BuildHasher,
{
let mut hasher = build_hasher.build_hasher();
key.hash(&mut hasher);

Expand Down Expand Up @@ -543,6 +615,12 @@ impl<T> ProbeLoopResult<T> {
}
}

pub(crate) enum InsertionResult<'g, K, V> {
AlreadyPresent(Shared<'g, Bucket<K, V>>),
Inserted,
ReplacedTombstone(Shared<'g, Bucket<K, V>>),
}

pub(crate) unsafe fn defer_destroy_bucket<'g, K, V>(
guard: &'g Guard,
mut ptr: Shared<'g, Bucket<K, V>>,
Expand All @@ -552,7 +630,7 @@ pub(crate) unsafe fn defer_destroy_bucket<'g, K, V>(
guard.defer_unchecked(move || {
atomic::fence(Ordering::Acquire);

if ptr.tag() & TOMBSTONE_TAG == 0 {
if !is_tombstone(ptr) {
ptr::drop_in_place(ptr.deref_mut().maybe_value.as_mut_ptr());
}

Expand All @@ -565,7 +643,7 @@ pub(crate) unsafe fn defer_destroy_tombstone<'g, K, V>(
mut ptr: Shared<'g, Bucket<K, V>>,
) {
assert!(!ptr.is_null());
assert_ne!(ptr.tag() & TOMBSTONE_TAG, 0);
assert!(is_tombstone(ptr));

atomic::fence(Ordering::Acquire);
// read the value now, but defer its destruction for later
Expand Down Expand Up @@ -634,6 +712,21 @@ pub(crate) const SENTINEL_TAG: usize = 0b001; // set on old table buckets when c
pub(crate) const TOMBSTONE_TAG: usize = 0b010; // set when the value has been destroyed
pub(crate) const BORROWED_TAG: usize = 0b100; // set on new table buckets when copied from an old table

#[inline]
pub(crate) fn is_sentinel<K, V>(bucket_ptr: Shared<'_, Bucket<K, V>>) -> bool {
bucket_ptr.tag() & SENTINEL_TAG != 0
}

#[inline]
pub(crate) fn is_tombstone<K, V>(bucket_ptr: Shared<'_, Bucket<K, V>>) -> bool {
bucket_ptr.tag() & TOMBSTONE_TAG != 0
}

#[inline]
pub(crate) fn is_borrowed<K, V>(bucket_ptr: Shared<'_, Bucket<K, V>>) -> bool {
bucket_ptr.tag() & BORROWED_TAG != 0
}

// #[cfg(test)]
// mod tests {
// use super::*;
Expand Down
Loading

0 comments on commit 8bd97fb

Please sign in to comment.