From a32b0d6f28c5252e5eb0b976b320bc1cbafcd310 Mon Sep 17 00:00:00 2001 From: Thomas Wright Date: Wed, 6 Nov 2024 14:23:20 +0100 Subject: [PATCH] Add documentation to the queuing runtime --- src/queuing_runtime.rs | 59 +++++++++++++++++++++++++++++++++++++++--- 1 file changed, 55 insertions(+), 4 deletions(-) diff --git a/src/queuing_runtime.rs b/src/queuing_runtime.rs index c467257..ade4cb2 100644 --- a/src/queuing_runtime.rs +++ b/src/queuing_runtime.rs @@ -17,6 +17,28 @@ use crate::core::Specification; use crate::core::StreamData; use crate::core::{OutputStream, StreamContext, VarName}; +/* + * A StreamContext that track the history of each of the variables as a queue + * (Vec) of values, and provides this as an asynchronous stream when requested + * by the monitoring semantics. + * + * This includes: + * - queues: a map from variable names to the underlying queues + * - input_streams: a map from variable names to the initial input streams + * - output_streams: a map from variable names to the streams that are lazily + * provided to the context + * - production_locks: a map from variable names to locks that are used to + * ensure that only one variable is updated at a time + * + * An QueuingVarContext is created with a list of variable names, the map of + * input streams, and a map of (lazily provided) output streams. + * + * The Val type parameter is the type of the values contained in each + * variable/stream. + * + * Note that currently this implementation does no garbage collection of + * historical values, and so the queues will grow indefinitely. + */ struct QueuingVarContext { queues: BTreeMap>>>, input_streams: BTreeMap>>>, @@ -47,7 +69,11 @@ impl QueuingVarContext { } } -// A stream that is either already arrived or is waiting to be lazily supplied +// A stream that is either already arrived (the Arrived state) or is waiting to +// be lazily supplied via a watch channel (the Waiting state). +// +// The get_stream function is used to wait until the stream is available and +// then return it. enum WaitingStream { Arrived(Arc>), Waiting(tokio::sync::watch::Receiver>>>), @@ -82,6 +108,14 @@ impl WaitingStream { } } +/* + * This function creates a buffered asynchronous stream based on an underlying + * shared queue of known values xs and a (lazily provided) stream + * waiting_stream. + * + * The returned stream will produce values from xs if they are available, or if + * not, will produce values from waiting stream + */ fn queue_buffered_stream( xs: Arc>>, waiting_stream: WaitingStream>, @@ -122,6 +156,9 @@ fn queue_buffered_stream( )) } +/* + * Implement StreamContext for QueuingVarContext + */ impl StreamContext for Arc> { fn var(&self, var: &VarName) -> Option> { let queue = self.queues.get(var)?; @@ -151,11 +188,13 @@ impl StreamContext for Arc> { } } +/* + * A subcontext that provides a view of the parent context with a limited + * history length. + */ struct SubMonitor { parent: Arc>, #[allow(dead_code)] - // TODO: implement restricting subcontexts to a certain history length; - // this is currently not implemented by the queuing runtime buffer_size: usize, index: Arc>, } @@ -192,6 +231,19 @@ impl StreamContext for SubMonitor { } } +/* + * A Monitor instance implementing the Queuing Runtime. + * + * This runtime uses a queue-based approach to track the history of each + * variable (based on the QueuingVarContext var_exchange) and provide the + * history as asynchronous streams to the monitoring semantics. + * + * - The Expr type parameter is the type of the expressions in the model. + * - The Val type parameter is the type of the values used in the channels. + * - The S type parameter is the monitoring semantics used to evaluate the + * expressions as streams. + * - The M type parameter is the model/specification being monitored. + */ pub struct QueuingMonitorRunner where Val: StreamData, @@ -200,7 +252,6 @@ where { model: M, var_exchange: Arc>, - // phantom_ts: PhantomData, semantics_t: PhantomData, expr_t: PhantomData, }