Skip to content

Commit

Permalink
docs: Document the slot module
Browse files Browse the repository at this point in the history
  • Loading branch information
kamalmarhubi committed Jun 3, 2016
1 parent 2086082 commit ee71210
Show file tree
Hide file tree
Showing 2 changed files with 74 additions and 3 deletions.
1 change: 1 addition & 0 deletions src/lib.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
mod lock;
/// A slot in memory for communicating between a producer and a consumer.
mod slot;
mod util;

Expand Down
76 changes: 73 additions & 3 deletions src/slot.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,16 +5,34 @@ use lock::Lock;
/// A slot in memory intended to represent the communication channel between one
/// producer and one consumer.
///
/// Each slot contains space for a piece of data, `T`, and space for callbacks.
/// The callbacks can be run when the data is either full or empty.
/// Each slot contains space for a piece of data of type `T`, and can have callbacks
/// registered to run when the slot is either full or empty.
///
/// # Registering callbacks
///
/// [`on_empty`](#method.on_empty) registers a callback to run when the slot
/// becomes empty, and [`on_full`](#method.on_full) registers one to run when it
/// becomes full. In both cases, the callback will run immediately if possible.
///
/// At most one callback can be registered at any given time: it is an error to
/// attempt to register a callback with `on_full` if one is currently registered
/// via `on_empty`, or any other combination.
///
/// # Cancellation
///
/// Registering a callback returns a `Token` which can be used to
/// [`cancel`](#method.cancel) the callback. Cancellation is best-effort:
/// cancelling a callback that has already run is not an error, and does not
/// signal this to the caller of `cancel`.
pub struct Slot<T> {
state: AtomicUsize,
slot: Lock<Option<T>>,
on_full: Lock<Option<Box<FnBox<T>>>>,
on_empty: Lock<Option<Box<FnBox<T>>>>,
}

/// Error value returned from erroneous calls to `try_produce`.
/// Error value returned from erroneous calls to `try_produce`, which contains
/// the value that was passed to `try_produce`.
#[derive(Debug, PartialEq)]
pub struct TryProduceError<T>(T);

Expand All @@ -30,8 +48,17 @@ pub struct OnFullError(());
#[derive(Debug, PartialEq)]
pub struct OnEmptyError(());

/// A `Token` represents a registered callback, and can be used to cancel the callback.
pub struct Token(usize);

/// Slot state: the lowest 3 bits are flags; the remaining bits are used to
/// store the `Token` for the currently registered callback. The special token
/// value 0 means no callback is registered.
///
/// The flags are:
/// - `DATA`: the `Slot` contains a value
/// - `ON_FULL`: the `Slot` has an `on_full` callback registered
/// - `ON_EMPTY`: the `Slot` has an `on_empty` callback registered
struct State(usize);

fn _is_send<T: Send>() {}
Expand All @@ -49,6 +76,9 @@ const STATE_BITS: usize = 3;
const STATE_MASK: usize = (1 << STATE_BITS) - 1;

impl<T: 'static> Slot<T> {
/// Creates a new `Slot` containing `val`, which may be `None` to create an
/// empty `Slot`.
// TODO: Separate this into `new`, and `with_value` functions?
pub fn new(val: Option<T>) -> Slot<T> {
Slot {
state: AtomicUsize::new(if val.is_some() {DATA} else {0}),
Expand All @@ -59,6 +89,14 @@ impl<T: 'static> Slot<T> {
}

// PRODUCER
/// Attempts to store `t` in the slot.
///
/// This method is to be called by the producer.
///
/// # Errors
///
/// Returns `Err` if the slot is already full. The value you attempted to
/// store is included in the error value.
pub fn try_produce(&self, t: T) -> Result<(), TryProduceError<T>> {
let mut state = State(self.state.load(Ordering::SeqCst));
assert!(!state.flag(ON_EMPTY));
Expand Down Expand Up @@ -90,15 +128,26 @@ impl<T: 'static> Slot<T> {
}

// PRODUCER
/// Registers `f` as a callback to run when the slot becomes empty. The
/// callback will run immediately if the slot is already empty.
///
/// This method is to be called by the producer.
///
/// # Panics
///
/// Panics if another callback was already registered via `on_empty` or
/// `on_full`.
pub fn on_empty<F>(&self, f: F) -> Token
where F: FnOnce(&Slot<T>) + Send + 'static
{
let mut state = State(self.state.load(Ordering::SeqCst));
// TODO: return OnEmptyError instead of assert.
assert!(!state.flag(ON_EMPTY));
if !state.flag(DATA) {
f(self);
return Token(0)
}
// TODO: return OnEmptyError instead of assert?
assert!(!state.flag(ON_FULL));
let mut slot = self.on_empty.try_lock().expect("on_empty interference");
assert!(slot.is_none());
Expand Down Expand Up @@ -129,6 +178,13 @@ impl<T: 'static> Slot<T> {
}

// CONSUMER
/// Attempts to consume the value stored in the slot.
///
/// This method is to be called by the consumer.
///
/// # Errors
///
/// Returns `Err` if the slot is already empty.
pub fn try_consume(&self) -> Result<T, TryConsumeError> {
let mut state = State(self.state.load(Ordering::SeqCst));
assert!(!state.flag(ON_FULL));
Expand Down Expand Up @@ -160,15 +216,26 @@ impl<T: 'static> Slot<T> {
}

// CONSUMER
/// Registers `f` as a callback to run when the slot becomes full. The
/// callback will run immediately if the slot is already full.
///
/// This method is to be called by the consumer.
///
/// # Panics
///
/// Panics if another callback was already registered via `on_empty` or
/// `on_full`.
pub fn on_full<F>(&self, f: F) -> Token
where F: FnOnce(&Slot<T>) + Send + 'static
{
let mut state = State(self.state.load(Ordering::SeqCst));
// TODO: return OnFullError instead of assert.
assert!(!state.flag(ON_FULL));
if state.flag(DATA) {
f(self);
return Token(0)
}
// TODO: return OnFullError instead of assert?
assert!(!state.flag(ON_EMPTY));
let mut slot = self.on_full.try_lock().expect("on_full interference");
assert!(slot.is_none());
Expand Down Expand Up @@ -198,6 +265,8 @@ impl<T: 'static> Slot<T> {
}
}

/// Cancels the callback associated with `token`. Returns successfully even
/// if the callback had already run; see [Cancellation](#cancellation).
pub fn cancel(&self, token: Token) {
let token = token.0;
if token == 0 {
Expand Down Expand Up @@ -241,6 +310,7 @@ impl<T: 'static> Slot<T> {
}

impl<T> TryProduceError<T> {
/// Extracts the value that was attempted to be produced.
pub fn into_inner(self) -> T {
self.0
}
Expand Down

0 comments on commit ee71210

Please sign in to comment.