Skip to content

Commit

Permalink
Add documentation to the queuing runtime
Browse files Browse the repository at this point in the history
  • Loading branch information
twright committed Nov 6, 2024
1 parent 6c2bc19 commit a32b0d6
Showing 1 changed file with 55 additions and 4 deletions.
59 changes: 55 additions & 4 deletions src/queuing_runtime.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,28 @@ use crate::core::Specification;
use crate::core::StreamData;
use crate::core::{OutputStream, StreamContext, VarName};

/*
* A StreamContext that track the history of each of the variables as a queue
* (Vec) of values, and provides this as an asynchronous stream when requested
* by the monitoring semantics.
*
* This includes:
* - queues: a map from variable names to the underlying queues
* - input_streams: a map from variable names to the initial input streams
* - output_streams: a map from variable names to the streams that are lazily
* provided to the context
* - production_locks: a map from variable names to locks that are used to
* ensure that only one variable is updated at a time
*
* An QueuingVarContext is created with a list of variable names, the map of
* input streams, and a map of (lazily provided) output streams.
*
* The Val type parameter is the type of the values contained in each
* variable/stream.
*
* Note that currently this implementation does no garbage collection of
* historical values, and so the queues will grow indefinitely.
*/
struct QueuingVarContext<Val: StreamData> {
queues: BTreeMap<VarName, Arc<Mutex<Vec<Val>>>>,
input_streams: BTreeMap<VarName, Arc<Mutex<OutputStream<Val>>>>,
Expand Down Expand Up @@ -47,7 +69,11 @@ impl<Val: StreamData> QueuingVarContext<Val> {
}
}

// A stream that is either already arrived or is waiting to be lazily supplied
// A stream that is either already arrived (the Arrived state) or is waiting to
// be lazily supplied via a watch channel (the Waiting state).
//
// The get_stream function is used to wait until the stream is available and
// then return it.
enum WaitingStream<S> {
Arrived(Arc<Mutex<S>>),
Waiting(tokio::sync::watch::Receiver<Option<Arc<Mutex<S>>>>),
Expand Down Expand Up @@ -82,6 +108,14 @@ impl<S> WaitingStream<S> {
}
}

/*
* This function creates a buffered asynchronous stream based on an underlying
* shared queue of known values xs and a (lazily provided) stream
* waiting_stream.
*
* The returned stream will produce values from xs if they are available, or if
* not, will produce values from waiting stream
*/
fn queue_buffered_stream<V: StreamData>(
xs: Arc<Mutex<Vec<V>>>,
waiting_stream: WaitingStream<OutputStream<V>>,
Expand Down Expand Up @@ -122,6 +156,9 @@ fn queue_buffered_stream<V: StreamData>(
))
}

/*
* Implement StreamContext for QueuingVarContext
*/
impl<Val: StreamData> StreamContext<Val> for Arc<QueuingVarContext<Val>> {
fn var(&self, var: &VarName) -> Option<OutputStream<Val>> {
let queue = self.queues.get(var)?;
Expand Down Expand Up @@ -151,11 +188,13 @@ impl<Val: StreamData> StreamContext<Val> for Arc<QueuingVarContext<Val>> {
}
}

/*
* A subcontext that provides a view of the parent context with a limited
* history length.
*/
struct SubMonitor<Val: StreamData> {
parent: Arc<QueuingVarContext<Val>>,
#[allow(dead_code)]
// TODO: implement restricting subcontexts to a certain history length;
// this is currently not implemented by the queuing runtime
buffer_size: usize,
index: Arc<StdMutex<usize>>,
}
Expand Down Expand Up @@ -192,6 +231,19 @@ impl<Val: StreamData> StreamContext<Val> for SubMonitor<Val> {
}
}

/*
* A Monitor instance implementing the Queuing Runtime.
*
* This runtime uses a queue-based approach to track the history of each
* variable (based on the QueuingVarContext var_exchange) and provide the
* history as asynchronous streams to the monitoring semantics.
*
* - The Expr type parameter is the type of the expressions in the model.
* - The Val type parameter is the type of the values used in the channels.
* - The S type parameter is the monitoring semantics used to evaluate the
* expressions as streams.
* - The M type parameter is the model/specification being monitored.
*/
pub struct QueuingMonitorRunner<Expr, Val, S, M>
where
Val: StreamData,
Expand All @@ -200,7 +252,6 @@ where
{
model: M,
var_exchange: Arc<QueuingVarContext<Val>>,
// phantom_ts: PhantomData<TS>,
semantics_t: PhantomData<S>,
expr_t: PhantomData<Expr>,
}
Expand Down

0 comments on commit a32b0d6

Please sign in to comment.