Skip to content
Closed
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
117 changes: 117 additions & 0 deletions src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -179,6 +179,29 @@ impl<T: Clone + Eq> Watchable<T> {
}
}

/// Creates a [`LazyDirect`] [`Watcher`], allowing the value to be observed, but not modified.
///
/// The [`LazyDirect`] watcher does not store the current value, making it smaller. If the watchable
/// is dropped, [`LazyDirect::get`] returns `T::default`.
pub fn watch_lazy(&self) -> LazyDirect<T>
where
T: Default,
{
LazyDirect {
epoch: self.shared.state().epoch,
shared: Arc::downgrade(&self.shared),
}
}

/// Creates a [`WeakWatcher`], which is a weak reference to the watchable's shared state.
///
/// It has the size of a single pointer, and can be upgraded to a [`Direct`] or [`LazyDirect`].
pub fn weak_watcher(&self) -> WeakWatcher<T> {
WeakWatcher {
shared: Arc::downgrade(&self.shared),
}
}

/// Returns the currently stored value.
pub fn get(&self) -> T {
self.shared.get()
Expand Down Expand Up @@ -357,6 +380,39 @@ pub trait Watcher: Clone {
}
}

/// A weak reference to a watchable value that can be upgraded to a full watcher later.
#[derive(Debug, Clone)]
pub struct WeakWatcher<T> {
shared: Weak<Shared<T>>,
}

impl<T: Clone + Eq> WeakWatcher<T> {
/// Upgrade to a [`Direct`] watcher, allowing to observe the value.
///
/// Returns `None` if the underlying [`Watchable`] has been dropped.
pub fn upgrade(&self) -> Option<Direct<T>> {
let shared = self.shared.upgrade()?;
let state = shared.state();
Some(Direct {
state,
shared: self.shared.clone(),
})
}
}

impl<T: Clone + Default + Eq> WeakWatcher<T> {
/// Upgrade to a [`LazyDirect`] watcher, allowing to observe the value.
///
/// The [`LazyDirect`] fetches the value on demand, and thus `lazy_upgrade` succeeds
/// even if the underlying watchable has been dropped.
pub fn upgrade_lazy(&self) -> LazyDirect<T> {
LazyDirect {
epoch: 0,
shared: self.shared.clone(),
}
}
}

/// The immediate, direct observer of a [`Watchable`] value.
///
/// This type is mainly used via the [`Watcher`] interface.
Expand Down Expand Up @@ -392,6 +448,48 @@ impl<T: Clone + Eq> Watcher for Direct<T> {
}
}

/// A lazy direct observer of a [`Watchable`] value.
///
/// Other than [`Direct`] it does not store the current value. It needs `T` to implement [`Default`].
/// If the watchable is dropped, [`Self::get`] will return `T::default()`.
///
/// This type is mainly used via the [`Watcher`] interface.
#[derive(Debug, Clone)]
pub struct LazyDirect<T> {
epoch: u64,
shared: Weak<Shared<T>>,
}

impl<T: Clone + Default + Eq> Watcher for LazyDirect<T> {
type Value = T;

fn get(&mut self) -> Self::Value {
if let Some(shared) = self.shared.upgrade() {
let state = shared.state();
self.epoch = state.epoch;
state.value
} else {
T::default()
}
}

fn is_connected(&self) -> bool {
self.shared.upgrade().is_some()
}

fn poll_updated(
&mut self,
cx: &mut task::Context<'_>,
) -> Poll<Result<Self::Value, Disconnected>> {
let Some(shared) = self.shared.upgrade() else {
return Poll::Ready(Err(Disconnected));
};
let state = ready!(shared.poll_updated(cx, self.epoch));
self.epoch = state.epoch;
Poll::Ready(Ok(state.value))
}
}

impl<S: Watcher, T: Watcher> Watcher for (S, T) {
type Value = (S::Value, T::Value);

Expand Down Expand Up @@ -1214,4 +1312,23 @@ mod tests {
assert!(!values.is_empty());
}
}

#[test]
fn test_lazy_direct() {
let a = Watchable::new(1u8);
let mut w1 = a.watch_lazy();
let mut w2 = a.watch_lazy();
assert_eq!(w1.get(), 1u8);
assert_eq!(w2.get(), 1u8);
a.set(2u8).unwrap();
assert_eq!(w1.get(), 2u8);
assert_eq!(w2.get(), 2u8);
let mut s1 = w1.stream_updates_only();
a.set(3u8).unwrap();
assert_eq!(n0_future::future::now_or_never(s1.next()), Some(Some(3u8)));
assert_eq!(w2.get(), 3u8);
drop(a);
assert_eq!(n0_future::future::now_or_never(s1.next()), Some(None));
assert_eq!(w2.get(), 0u8);
}
}
Loading