Skip to content

Commit

Permalink
Support notification on eviction
Browse files Browse the repository at this point in the history
Emit an error log when the user provided eviction listener panics. (Requires
`logging` feature enabled)
  • Loading branch information
tatsuya6502 authored Jul 2, 2022
1 parent e49f0fe commit 0ee3005
Show file tree
Hide file tree
Showing 3 changed files with 50 additions and 34 deletions.
19 changes: 9 additions & 10 deletions src/future/cache.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2449,10 +2449,15 @@ mod tests {
verify_notification_vec(&cache, actual, &expected);
}

// NOTE: To enable the panic logging, run the following command:
//
// RUST_LOG=moka=info cargo test --features 'future, logging' -- \
// future::cache::tests::recover_from_panicking_eviction_listener --exact --nocapture
//
#[tokio::test]
async fn recover_from_panicking_eviction_listener() {
use futures_util::FutureExt;
use std::panic::AssertUnwindSafe;
#[cfg(feature = "logging")]
let _ = env_logger::builder().is_test(true).try_init();

// The following `Vec`s will hold actual and expected notifications.
let actual = Arc::new(Mutex::new(Vec::new()));
Expand Down Expand Up @@ -2485,14 +2490,8 @@ mod tests {
cache.sync();

// Insert an okay value. This will replace the previsous
// value "panic now!" so the eviction listener will panick.
match AssertUnwindSafe(cache.insert("alice", "a2"))
.catch_unwind()
.await
{
Ok(()) => (), // pass
r => panic!("Unexpected result: {:?}", r),
}
// value "panic now!" so the eviction listener will panic.
cache.insert("alice", "a2").await;
cache.sync();
// No more removal notification should be sent.

Expand Down
21 changes: 10 additions & 11 deletions src/sync/cache.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2439,14 +2439,20 @@ mod tests {
}
}

// NOTE: To enable the panic logging, run the following command:
//
// RUST_LOG=moka=info cargo test --features 'logging' -- \
// sync::cache::tests::recover_from_panicking_eviction_listener --exact --nocapture
//
#[test]
fn recover_from_panicking_eviction_listener() {
#[cfg(feature = "logging")]
let _ = env_logger::builder().is_test(true).try_init();

run_test(DeliveryMode::Immediate);
run_test(DeliveryMode::Queued);

fn run_test(delivery_mode: DeliveryMode) {
use std::panic::{catch_unwind, AssertUnwindSafe};

// The following `Vec`s will hold actual and expected notifications.
let actual = Arc::new(Mutex::new(Vec::new()));
let mut expected = Vec::new();
Expand Down Expand Up @@ -2483,15 +2489,8 @@ mod tests {
cache.sync();

// Insert an okay value. This will replace the previsous
// value "panic now!" so the eviction listener will panick.
match catch_unwind(AssertUnwindSafe(|| cache.insert("alice", "a2"))) {
Ok(()) if delivery_mode == DeliveryMode::Queued => (), // pass
Err(_) if delivery_mode == DeliveryMode::Immediate => (), // pass
r => panic!(
"Unexpected result for delivery mode {:?}: {:?}",
delivery_mode, r
),
}
// value "panic now!" so the eviction listener will panic.
cache.insert("alice", "a2");
cache.sync();
// No more removal notification should be sent.

Expand Down
44 changes: 31 additions & 13 deletions src/sync_base/removal_notifier.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,6 @@ const MAX_NOTIFICATIONS_PER_TASK: u16 = 5_000;

pub(crate) enum RemovalNotifier<K, V> {
Blocking(BlockingRemovalNotifier<K, V>),
// NonBlocking(NonBlockingRemovalNotifier<K, V>),
ThreadPool(ThreadPoolRemovalNotifier<K, V>),
}

Expand All @@ -40,11 +39,7 @@ impl<K, V> RemovalNotifier<K, V> {
}

pub(crate) fn is_batching_supported(&self) -> bool {
matches!(
self,
// RemovalNotifier::NonBlocking(_) | RemovalNotifier::ThreadPool(_)
RemovalNotifier::ThreadPool(_)
)
matches!(self, RemovalNotifier::ThreadPool(_))
}

pub(crate) fn notify(&self, key: Arc<K>, value: V, cause: RemovalCause)
Expand All @@ -54,7 +49,6 @@ impl<K, V> RemovalNotifier<K, V> {
{
match self {
RemovalNotifier::Blocking(notifier) => notifier.notify(key, value, cause),
// RemovalNotifier::NonBlocking(_) => todo!(),
RemovalNotifier::ThreadPool(notifier) => {
notifier.add_single_notification(key, value, cause)
}
Expand All @@ -68,7 +62,6 @@ impl<K, V> RemovalNotifier<K, V> {
{
match self {
RemovalNotifier::Blocking(_) => unreachable!(),
// RemovalNotifier::NonBlocking(_) => todo!(),
RemovalNotifier::ThreadPool(notifier) => notifier.add_multiple_notifications(entries),
}
}
Expand All @@ -80,7 +73,6 @@ impl<K, V> RemovalNotifier<K, V> {
{
match self {
RemovalNotifier::Blocking(_) => unreachable!(),
// RemovalNotifier::NonBlocking(_) => todo!(),
RemovalNotifier::ThreadPool(notifier) => notifier.submit_task(),
}
}
Expand All @@ -100,7 +92,7 @@ impl<K, V> BlockingRemovalNotifier<K, V> {
}

fn notify(&self, key: Arc<K>, value: V, cause: RemovalCause) {
use std::panic::{catch_unwind, resume_unwind, AssertUnwindSafe};
use std::panic::{catch_unwind, AssertUnwindSafe};

if !self.is_enabled.load(Ordering::Acquire) {
return;
Expand All @@ -111,9 +103,10 @@ impl<K, V> BlockingRemovalNotifier<K, V> {
// Safety: It is safe to assert unwind safety here because we will not
// call the listener again if it has been panicked.
let result = catch_unwind(AssertUnwindSafe(listener_clo));
if let Err(payload) = result {
if let Err(_payload) = result {
self.is_enabled.store(false, Ordering::Release);
resume_unwind(payload);
#[cfg(feature = "logging")]
log_panic(&*_payload);
}
}
}
Expand Down Expand Up @@ -295,7 +288,14 @@ impl<K, V> NotificationTask<K, V> {

// Safety: It is safe to assert unwind safety here because we will not
// call the listener again if it has been panicked.
catch_unwind(AssertUnwindSafe(listener_clo))
let result = catch_unwind(AssertUnwindSafe(listener_clo));
#[cfg(feature = "logging")]
{
if let Err(payload) = &result {
log_panic(&**payload);
}
}
result
}
}

Expand Down Expand Up @@ -360,3 +360,21 @@ impl<K, V> RemovedEntries<K, V> {
Self::Multi(entries)
}
}

#[cfg(feature = "logging")]
fn log_panic(payload: &(dyn std::any::Any + Send + 'static)) {
let message: Option<std::borrow::Cow<'_, str>> = if let Some(s) = payload.downcast_ref::<&str>()
{
Some((*s).into())
} else if let Some(s) = payload.downcast_ref::<String>() {
Some(s.into())
} else {
None
};

if let Some(m) = message {
log::error!("Eviction listener panicked at '{}'", m);
} else {
log::error!("Eviction listener panicked");
}
}

0 comments on commit 0ee3005

Please sign in to comment.