Skip to content

Commit

Permalink
Merge pull request #265 from Swatinem/probe-iter
Browse files Browse the repository at this point in the history
Remove `probe_loop` in favor of Iterator
  • Loading branch information
tatsuya6502 authored Jun 7, 2023
2 parents 0c04d55 + da251c4 commit 90fa847
Showing 1 changed file with 104 additions and 109 deletions.
213 changes: 104 additions & 109 deletions src/cht/map/bucket.rs
Original file line number Diff line number Diff line change
Expand Up @@ -82,33 +82,31 @@ impl<'g, K: 'g + Eq, V: 'g> BucketArray<K, V> {
hash: u64,
mut eq: impl FnMut(&K) -> bool,
) -> Result<Shared<'g, Bucket<K, V>>, RelocatedError> {
let loop_result = self.probe_loop(guard, hash, |_, _, this_bucket_ptr| {
for bucket in self.probe(guard, hash) {
let Ok((_, _, this_bucket_ptr)) = bucket else { return Err(RelocatedError); };

let this_bucket_ref = if let Some(r) = unsafe { this_bucket_ptr.as_ref() } {
r
} else {
// Not found.
return ProbeLoopAction::Return(Shared::null());
return Ok(Shared::null());
};

if !eq(&this_bucket_ref.key) {
// Different key. Try next bucket
return ProbeLoopAction::Continue;
continue;
}

if is_tombstone(this_bucket_ptr) {
// Not found. (It has been removed)
ProbeLoopAction::Return(Shared::null())
return Ok(Shared::null());
} else {
// Found.
ProbeLoopAction::Return(this_bucket_ptr)
return Ok(this_bucket_ptr);
}
});

match loop_result {
ProbeLoopResult::Returned(t) => Ok(t),
ProbeLoopResult::LoopEnded => Ok(Shared::null()),
ProbeLoopResult::FoundSentinelTag => Err(RelocatedError),
}

Ok(Shared::null())
}

pub(crate) fn remove_if<F>(
Expand All @@ -121,31 +119,34 @@ impl<'g, K: 'g + Eq, V: 'g> BucketArray<K, V> {
where
F: FnMut(&K, &V) -> bool,
{
let loop_result = self.probe_loop(guard, hash, |_, this_bucket, this_bucket_ptr| {
let mut probe = self.probe(guard, hash);
while let Some(bucket) = probe.next() {
let Ok((_, this_bucket, this_bucket_ptr)) = bucket else { return Err(condition); };

let this_bucket_ref = if let Some(r) = unsafe { this_bucket_ptr.as_ref() } {
r
} else {
// Nothing to remove.
return ProbeLoopAction::Return(Shared::null());
return Ok(Shared::null());
};

let this_key = &this_bucket_ref.key;

if !eq(this_key) {
// Different key. Try next bucket.
return ProbeLoopAction::Continue;
continue;
}

if is_tombstone(this_bucket_ptr) {
// Already removed.
return ProbeLoopAction::Return(Shared::null());
return Ok(Shared::null());
}

let this_value = unsafe { &*this_bucket_ref.maybe_value.as_ptr() };

if !condition(this_key, this_value) {
// Found but the condition is false. Do not remove.
return ProbeLoopAction::Return(Shared::null());
return Ok(Shared::null());
}

// Found and the condition is true. Remove it. (Make it a tombstone)
Expand All @@ -160,45 +161,35 @@ impl<'g, K: 'g + Eq, V: 'g> BucketArray<K, V> {
guard,
) {
// Succeeded. Return the removed value. (can be null)
Ok(_) => ProbeLoopAction::Return(new_bucket_ptr),
Ok(_) => return Ok(new_bucket_ptr),
// Failed. Reload to retry.
Err(_) => ProbeLoopAction::Reload,
Err(_) => probe.reload(),
}
});

match loop_result {
ProbeLoopResult::Returned(t) => Ok(t),
ProbeLoopResult::LoopEnded => Ok(Shared::null()),
ProbeLoopResult::FoundSentinelTag => Err(condition),
}

Ok(Shared::null())
}

pub(crate) fn insert_if_not_present<F>(
&self,
guard: &'g Guard,
hash: u64,
state: InsertOrModifyState<K, V, F>,
mut state: InsertOrModifyState<K, V, F>,
) -> Result<InsertionResult<'g, K, V>, InsertOrModifyState<K, V, F>>
where
F: FnOnce() -> V,
{
let mut maybe_state = Some(state);

let loop_result = self.probe_loop(guard, hash, |_, this_bucket, this_bucket_ptr| {
let state = maybe_state.take().unwrap();

let mut probe = self.probe(guard, hash);
while let Some(Ok((_, this_bucket, this_bucket_ptr))) = probe.next() {
if let Some(this_bucket_ref) = unsafe { this_bucket_ptr.as_ref() } {
if &this_bucket_ref.key != state.key() {
// Different key. Try next bucket.
maybe_state = Some(state);
return ProbeLoopAction::Continue;
continue;
}

if !is_tombstone(this_bucket_ptr) {
// Found. Return it.
return ProbeLoopAction::Return(InsertionResult::AlreadyPresent(
this_bucket_ptr,
));
return Ok(InsertionResult::AlreadyPresent(this_bucket_ptr));
}
}

Expand All @@ -213,18 +204,18 @@ impl<'g, K: 'g + Eq, V: 'g> BucketArray<K, V> {
Ordering::Relaxed,
guard,
) {
maybe_state = Some(InsertOrModifyState::from_bucket_value(new, None));
ProbeLoopAction::Reload
state = InsertOrModifyState::from_bucket_value(new, None);
probe.reload();
} else if unsafe { this_bucket_ptr.as_ref() }.is_some() {
// Inserted by replacing a tombstone.
ProbeLoopAction::Return(InsertionResult::ReplacedTombstone(this_bucket_ptr))
return Ok(InsertionResult::ReplacedTombstone(this_bucket_ptr));
} else {
// Inserted.
ProbeLoopAction::Return(InsertionResult::Inserted)
return Ok(InsertionResult::Inserted);
}
});
}

loop_result.returned().ok_or_else(|| maybe_state.unwrap())
Err(state)
}

// https://rust-lang.github.io/rust-clippy/master/index.html#type_complexity
Expand All @@ -233,26 +224,24 @@ impl<'g, K: 'g + Eq, V: 'g> BucketArray<K, V> {
&self,
guard: &'g Guard,
hash: u64,
state: InsertOrModifyState<K, V, F>,
mut state: InsertOrModifyState<K, V, F>,
mut modifier: G,
) -> Result<Shared<'g, Bucket<K, V>>, (InsertOrModifyState<K, V, F>, G)>
where
F: FnOnce() -> V,
G: FnMut(&K, &V) -> V,
{
let mut maybe_state = Some(state);

let loop_result = self.probe_loop(guard, hash, |_, this_bucket, this_bucket_ptr| {
let state = maybe_state.take().unwrap();
let mut probe = self.probe(guard, hash);
while let Some(bucket) = probe.next() {
let Ok((_, this_bucket, this_bucket_ptr)) = bucket else { return Err((state, modifier)); };

let (new_bucket, maybe_insert_value) =
if let Some(this_bucket_ref) = unsafe { this_bucket_ptr.as_ref() } {
let this_key = &this_bucket_ref.key;

if this_key != state.key() {
// Different key. Try next bucket.
maybe_state = Some(state);
return ProbeLoopAction::Continue;
continue;
}

if is_tombstone(this_bucket_ptr) {
Expand Down Expand Up @@ -280,20 +269,15 @@ impl<'g, K: 'g + Eq, V: 'g> BucketArray<K, V> {
guard,
) {
// Failed. Reload to retry.
maybe_state = Some(InsertOrModifyState::from_bucket_value(
new,
maybe_insert_value,
));
ProbeLoopAction::Reload
state = InsertOrModifyState::from_bucket_value(new, maybe_insert_value);
probe.reload();
} else {
// Succeeded. Return the previous value. (can be null)
ProbeLoopAction::Return(this_bucket_ptr)
return Ok(this_bucket_ptr);
}
});
}

loop_result
.returned()
.ok_or_else(|| (maybe_state.unwrap(), modifier))
Err((state, modifier))
}

fn insert_for_grow(
Expand All @@ -308,19 +292,22 @@ impl<'g, K: 'g + Eq, V: 'g> BucketArray<K, V> {

let key = &unsafe { bucket_ptr.deref() }.key;

let loop_result = self.probe_loop(guard, hash, |i, this_bucket, this_bucket_ptr| {
let mut probe = self.probe(guard, hash);
while let Some(bucket) = probe.next() {
let Ok((i, this_bucket, this_bucket_ptr)) = bucket else { return None; };

if let Some(Bucket { key: this_key, .. }) = unsafe { this_bucket_ptr.as_ref() } {
if this_bucket_ptr == bucket_ptr {
return ProbeLoopAction::Return(None);
return None;
} else if this_key != key {
return ProbeLoopAction::Continue;
continue;
} else if !is_borrowed(this_bucket_ptr) {
return ProbeLoopAction::Return(None);
return None;
}
}

if this_bucket_ptr.is_null() && is_tombstone(bucket_ptr) {
ProbeLoopAction::Return(None)
return None;
} else if this_bucket
.compare_exchange_weak(
this_bucket_ptr,
Expand All @@ -331,13 +318,13 @@ impl<'g, K: 'g + Eq, V: 'g> BucketArray<K, V> {
)
.is_ok()
{
ProbeLoopAction::Return(Some(i))
return Some(i);
} else {
ProbeLoopAction::Reload
probe.reload();
}
});
}

loop_result.returned().flatten()
None
}

pub(crate) fn keys<F, T>(
Expand Down Expand Up @@ -368,34 +355,63 @@ impl<'g, K: 'g + Eq, V: 'g> BucketArray<K, V> {
}
}

impl<'g, K: 'g, V: 'g> BucketArray<K, V> {
fn probe_loop<F, T>(&self, guard: &'g Guard, hash: u64, mut f: F) -> ProbeLoopResult<T>
where
F: FnMut(usize, &Atomic<Bucket<K, V>>, Shared<'g, Bucket<K, V>>) -> ProbeLoopAction<T>,
{
let offset = hash as usize & (self.buckets.len() - 1);
struct Probe<'b, 'g, K: 'g, V: 'g> {
buckets: &'b [Atomic<Bucket<K, V>>],
guard: &'g Guard,
this_bucket: (usize, &'b Atomic<Bucket<K, V>>),
offset: usize,

for i in
(0..self.buckets.len()).map(|i| (i.wrapping_add(offset)) & (self.buckets.len() - 1))
{
let this_bucket = &self.buckets[i];
i: usize,
reload: bool,
}

loop {
let this_bucket_ptr = this_bucket.load_consume(guard);
impl<'b, 'g, K: 'g, V: 'g> Probe<'b, 'g, K, V> {
fn reload(&mut self) {
self.reload = true;
}
}

if is_sentinel(this_bucket_ptr) {
return ProbeLoopResult::FoundSentinelTag;
}
impl<'b, 'g, K: 'g, V: 'g> Iterator for Probe<'b, 'g, K, V> {
type Item = Result<(usize, &'b Atomic<Bucket<K, V>>, Shared<'g, Bucket<K, V>>), ()>;

match f(i, this_bucket, this_bucket_ptr) {
ProbeLoopAction::Continue => break,
ProbeLoopAction::Reload => (),
ProbeLoopAction::Return(t) => return ProbeLoopResult::Returned(t),
}
fn next(&mut self) -> Option<Self::Item> {
if !self.reload {
let max = self.buckets.len() - 1;
if self.i >= max {
return None;
}
self.i += 1;
let i = self.i.wrapping_add(self.offset) & max;
self.this_bucket = (i, &self.buckets[i]);
}
self.reload = false;

let this_bucket_ptr = self.this_bucket.1.load_consume(self.guard);

ProbeLoopResult::LoopEnded
if is_sentinel(this_bucket_ptr) {
return Some(Err(()));
}

let val = (self.this_bucket.0, self.this_bucket.1, this_bucket_ptr);
Some(Ok(val))
}
}

impl<'g, K: 'g, V: 'g> BucketArray<K, V> {
fn probe(&self, guard: &'g Guard, hash: u64) -> Probe<'_, 'g, K, V> {
let buckets = &self.buckets;
let offset = hash as usize & (buckets.len() - 1);
// FIXME: this will panic if `len() == 0`
let this_bucket = (offset, &buckets[offset]);
Probe {
buckets,
guard,
this_bucket,
offset,

i: 0,
reload: true,
}
}

pub(crate) fn rehash<H>(
Expand Down Expand Up @@ -644,27 +660,6 @@ where
hasher.finish()
}

enum ProbeLoopAction<T> {
Continue,
Reload,
Return(T),
}

enum ProbeLoopResult<T> {
LoopEnded,
FoundSentinelTag,
Returned(T),
}

impl<T> ProbeLoopResult<T> {
fn returned(self) -> Option<T> {
match self {
Self::Returned(t) => Some(t),
Self::LoopEnded | Self::FoundSentinelTag => None,
}
}
}

pub(crate) enum InsertionResult<'g, K, V> {
AlreadyPresent(Shared<'g, Bucket<K, V>>),
Inserted,
Expand Down

0 comments on commit 90fa847

Please sign in to comment.