Skip to content

Commit

Permalink
Merge pull request #224 from moka-rs/refactor-sync-value-initializer
Browse files Browse the repository at this point in the history
Refactor an internal module `sync::value_initializer` to match the new
structure of future::value_initializer.
  • Loading branch information
tatsuya6502 authored Feb 7, 2023
2 parents 058f839 + 0fe3b5f commit 142783e
Show file tree
Hide file tree
Showing 5 changed files with 84 additions and 133 deletions.
6 changes: 3 additions & 3 deletions src/future/cache.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1501,7 +1501,7 @@ where

match self
.value_initializer
.try_init_or_read(&Arc::clone(&key), type_id, get, init, insert, post_init)
.try_init_or_read(&key, type_id, get, init, insert, post_init)
.await
{
InitResult::Initialized(v) => {
Expand Down Expand Up @@ -1623,7 +1623,7 @@ where

match self
.value_initializer
.try_init_or_read(&Arc::clone(&key), type_id, get, init, insert, post_init)
.try_init_or_read(&key, type_id, get, init, insert, post_init)
.await
{
InitResult::Initialized(v) => {
Expand Down Expand Up @@ -1706,7 +1706,7 @@ where

match self
.value_initializer
.try_init_or_read(&Arc::clone(&key), type_id, get, init, insert, post_init)
.try_init_or_read(&key, type_id, get, init, insert, post_init)
.await
{
InitResult::Initialized(v) => {
Expand Down
10 changes: 5 additions & 5 deletions src/future/value_initializer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -138,8 +138,8 @@ where
init: Pin<&mut impl Future<Output = O>>,
// Closure that returns a future to insert a new value into cache.
mut insert: impl FnMut(V) -> BoxFuture<'a, ()> + Send + 'a,
// This function will be called after the init future has returned a value of
// type O. It converts O into Result<V, E>.
// Function to convert a value O, returned from the init future, into
// Result<V, E>.
post_init: fn(O) -> Result<V, E>,
) -> InitResult<V, E>
where
Expand Down Expand Up @@ -180,9 +180,9 @@ where
return InitResult::ReadExisting(value);
}

// The value still does note exist. Let's resolve the init future.

// Catching panic is safe here as we do not try to resolve the future again.
// The value still does note exist. Let's resolve the init
// future. Catching panic is safe here as we do not try to
// resolve the future again.
match AssertUnwindSafe(init).catch_unwind().await {
// Resolved.
Ok(value) => {
Expand Down
2 changes: 1 addition & 1 deletion src/sync.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,4 +22,4 @@ pub trait ConcurrentCacheExt<K, V> {

// Empty internal struct to be used in optionally_get_with to represent the None
// results.
struct OptionallyNone;
pub(crate) struct OptionallyNone;
15 changes: 12 additions & 3 deletions src/sync/cache.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1153,9 +1153,12 @@ where
None
};

let type_id = ValueInitializer::<K, V, S>::type_id_for_get_with();
let post_init = ValueInitializer::<K, V, S>::post_init_for_get_with;

match self
.value_initializer
.init_or_read(Arc::clone(&key), get, init, insert)
.try_init_or_read(&key, type_id, get, init, insert, post_init)
{
InitResult::Initialized(v) => {
crossbeam_epoch::pin().flush();
Expand Down Expand Up @@ -1375,9 +1378,12 @@ where
None
};

let type_id = ValueInitializer::<K, V, S>::type_id_for_optionally_get_with();
let post_init = ValueInitializer::<K, V, S>::post_init_for_optionally_get_with;

match self
.value_initializer
.optionally_init_or_read(Arc::clone(&key), get, init, insert)
.try_init_or_read(&key, type_id, get, init, insert, post_init)
{
InitResult::Initialized(v) => {
crossbeam_epoch::pin().flush();
Expand Down Expand Up @@ -1567,9 +1573,12 @@ where
None
};

let type_id = ValueInitializer::<K, V, S>::type_id_for_try_get_with::<E>();
let post_init = ValueInitializer::<K, V, S>::post_init_for_try_get_with;

match self
.value_initializer
.try_init_or_read(Arc::clone(&key), get, init, insert)
.try_init_or_read(&key, type_id, get, init, insert, post_init)
{
InitResult::Initialized(v) => {
crossbeam_epoch::pin().flush();
Expand Down
184 changes: 63 additions & 121 deletions src/sync/value_initializer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -45,117 +45,19 @@ where

/// # Panics
/// Panics if the `init` closure has been panicked.
pub(crate) fn init_or_read(
&self,
key: Arc<K>,
// Closure to get an existing value from cache.
get: impl FnMut() -> Option<V>,
init: impl FnOnce() -> V,
// Closure to insert a new value into cache.
mut insert: impl FnMut(V),
) -> InitResult<V, ()> {
// This closure will be called before the init closure is called, in order to
// check if the value has already been inserted by other thread.
let pre_init = make_pre_init(get);

// This closure will be called after the init closure has returned a value.
// It will insert the returned value (from init) to the cache, and convert
// the value into a pair of a WaiterValue and an InitResult.
let post_init = |value: V| {
insert(value.clone());
(Some(Ok(value.clone())), InitResult::Initialized(value))
};

let type_id = TypeId::of::<()>();
self.do_try_init(&key, type_id, pre_init, init, post_init)
}

/// # Panics
/// Panics if the `init` closure has been panicked.
pub(crate) fn try_init_or_read<E>(
&self,
key: Arc<K>,
get: impl FnMut() -> Option<V>,
init: impl FnOnce() -> Result<V, E>,
mut insert: impl FnMut(V),
) -> InitResult<V, E>
where
E: Send + Sync + 'static,
{
let type_id = TypeId::of::<E>();

// This closure will be called before the init closure is called, in order to
// check if the value has already been inserted by other thread.
let pre_init = make_pre_init(get);

// This closure will be called after the init closure has returned a value.
// It will insert the returned value (from init) to the cache, and convert
// the value into a pair of a WaiterValue and an InitResult.
let post_init = |value: Result<V, E>| match value {
Ok(value) => {
insert(value.clone());
(Some(Ok(value.clone())), InitResult::Initialized(value))
}
Err(e) => {
let err: ErrorObject = Arc::new(e);
(
Some(Err(Arc::clone(&err))),
InitResult::InitErr(err.downcast().unwrap()),
)
}
};

self.do_try_init(&key, type_id, pre_init, init, post_init)
}

/// # Panics
/// Panics if the `init` closure has been panicked.
pub(super) fn optionally_init_or_read(
&self,
key: Arc<K>,
get: impl FnMut() -> Option<V>,
init: impl FnOnce() -> Option<V>,
mut insert: impl FnMut(V),
) -> InitResult<V, OptionallyNone> {
let type_id = TypeId::of::<OptionallyNone>();

// This closure will be called before the init closure is called, in order to
// check if the value has already been inserted by other thread.
let pre_init = make_pre_init(get);

// This closure will be called after the init closure has returned a value.
// It will insert the returned value (from init) to the cache, and convert
// the value into a pair of a WaiterValue and an InitResult.
let post_init = |value: Option<V>| match value {
Some(value) => {
insert(value.clone());
(Some(Ok(value.clone())), InitResult::Initialized(value))
}
None => {
// `value` can be either `Some` or `None`. For `None` case, without
// change the existing API too much, we will need to convert `None`
// to Arc<E> here. `Infallible` could not be instantiated. So it
// might be good to use an empty struct to indicate the error type.
let err: ErrorObject = Arc::new(OptionallyNone);
(
Some(Err(Arc::clone(&err))),
InitResult::InitErr(err.downcast().unwrap()),
)
}
};

self.do_try_init(&key, type_id, pre_init, init, post_init)
}

/// # Panics
/// Panics if the `init` closure has been panicked.
fn do_try_init<O, E>(
pub(crate) fn try_init_or_read<O, E>(
&self,
key: &Arc<K>,
type_id: TypeId,
mut pre_init: impl FnMut() -> Option<(WaiterValue<V>, InitResult<V, E>)>,
// Closure to get an existing value from cache.
mut get: impl FnMut() -> Option<V>,
// Closure to initialize a new value.
init: impl FnOnce() -> O,
mut post_init: impl FnMut(O) -> (WaiterValue<V>, InitResult<V, E>),
// Closure to insert a new value into cache.
mut insert: impl FnMut(V),
// Function to convert a value O, returned from the init future, into
// Result<V, E>.
post_init: fn(O) -> Result<V, E>,
) -> InitResult<V, E>
where
E: Send + Sync + 'static,
Expand All @@ -176,12 +78,12 @@ where
None => {
// Our waiter was inserted.
// Check if the value has already been inserted by other thread.
if let Some((waiter_val, init_res)) = pre_init() {
if let Some(value) = get() {
// Yes. Set the waiter value, remove our waiter, and return
// the existing value.
*lock = waiter_val;
*lock = Some(Ok(value.clone()));
self.remove_waiter(cht_key, hash);
return init_res;
return InitResult::ReadExisting(value);
}

// The value still does note exist. Let's evaluate the init
Expand All @@ -190,7 +92,19 @@ where
match catch_unwind(AssertUnwindSafe(init)) {
// Evaluated.
Ok(value) => {
let (waiter_val, init_res) = post_init(value);
let (waiter_val, init_res) = match post_init(value) {
Ok(value) => {
insert(value.clone());
(Some(Ok(value.clone())), InitResult::Initialized(value))
}
Err(e) => {
let err: ErrorObject = Arc::new(e);
(
Some(Err(Arc::clone(&err))),
InitResult::InitErr(err.downcast().unwrap()),
)
}
};
*lock = waiter_val;
self.remove_waiter(cht_key, hash);
return init_res;
Expand Down Expand Up @@ -231,6 +145,44 @@ where
}
}

/// The `post_init` function for the `get_with` method of cache.
pub(crate) fn post_init_for_get_with(value: V) -> Result<V, ()> {
Ok(value)
}

/// The `post_init` function for the `optionally_get_with` method of cache.
pub(crate) fn post_init_for_optionally_get_with(
value: Option<V>,
) -> Result<V, Arc<OptionallyNone>> {
// `value` can be either `Some` or `None`. For `None` case, without change
// the existing API too much, we will need to convert `None` to Arc<E> here.
// `Infallible` could not be instantiated. So it might be good to use an
// empty struct to indicate the error type.
value.ok_or(Arc::new(OptionallyNone))
}

/// The `post_init` function for `try_get_with` method of cache.
pub(crate) fn post_init_for_try_get_with<E>(result: Result<V, E>) -> Result<V, E> {
result
}

/// Returns the `type_id` for `get_with` method of cache.
pub(crate) fn type_id_for_get_with() -> TypeId {
// NOTE: We use a regular function here instead of a const fn because TypeId
// is not stable as a const fn. (as of our MSRV)
TypeId::of::<()>()
}

/// Returns the `type_id` for `optionally_get_with` method of cache.
pub(crate) fn type_id_for_optionally_get_with() -> TypeId {
TypeId::of::<OptionallyNone>()
}

/// Returns the `type_id` for `try_get_with` method of cache.
pub(crate) fn type_id_for_try_get_with<E: 'static>() -> TypeId {
TypeId::of::<E>()
}

#[inline]
fn remove_waiter(&self, cht_key: (Arc<K>, TypeId), hash: u64) {
self.waiters.remove(hash, |k| k == &cht_key);
Expand All @@ -254,13 +206,3 @@ where
(cht_key, hash)
}
}

#[inline]
fn make_pre_init<V, E>(
mut get: impl FnMut() -> Option<V>,
) -> impl FnMut() -> Option<(WaiterValue<V>, InitResult<V, E>)>
where
V: Clone,
{
move || get().map(|value| (Some(Ok(value.clone())), InitResult::ReadExisting(value)))
}

0 comments on commit 142783e

Please sign in to comment.