diff --git a/hydroflow/src/scheduled/context.rs b/hydroflow/src/scheduled/context.rs index 029f43736ba4..982277d2b1c0 100644 --- a/hydroflow/src/scheduled/context.rs +++ b/hydroflow/src/scheduled/context.rs @@ -1,12 +1,15 @@ //! Module for the user-facing [`Context`] object. +//! +//! Provides APIs for state and scheduling. use std::any::Any; +use std::collections::VecDeque; use std::future::Future; use std::marker::PhantomData; use std::ops::DerefMut; use std::pin::Pin; -use tokio::sync::mpsc::UnboundedSender; +use tokio::sync::mpsc::{self, UnboundedReceiver, UnboundedSender}; use tokio::task::JoinHandle; use web_time::SystemTime; @@ -14,30 +17,38 @@ use super::state::StateHandle; use super::{StateId, SubgraphId}; use crate::scheduled::ticks::TickInstant; -/// The main state of the Hydroflow instance, which is provided as a reference -/// to each operator as it is run. +/// The main state and scheduler of the Hydroflow instance. Provided as the `context` API to each +/// subgraph/operator as it is run. /// -/// As an optimization, each Hydroflow instances stores eactly one Context -/// inline, which allows us to avoid any construction/deconstruction costs. -/// Before the `Context` is provided to a running operator, the `subgraph_id` -/// field must be updated. +/// Each Hydroflow instances stores eactly one Context inline. Before the `Context` is provided to +/// a running operator, the `subgraph_id` field must be updated. pub struct Context { + /// User-facing State API. states: Vec, + /// TODO(mingwei): separate scheduler into its own struct/trait? + /// Index is stratum, value is FIFO queue for that stratum. + pub(super) stratum_queues: Vec>, + /// Receive events, if second arg indicates if it is an external "important" event (true). + pub(super) event_queue_recv: UnboundedReceiver<(SubgraphId, bool)>, + /// If external events or data can justify starting the next tick. + pub(super) can_start_tick: bool, + /// If the events have been received for this tick. + pub(super) events_received_tick: bool, + // TODO(mingwei): as long as this is here, it's impossible to know when all work is done. // Second field (bool) is for if the event is an external "important" event (true). - pub(crate) event_queue_send: UnboundedSender<(SubgraphId, bool)>, + pub(super) event_queue_send: UnboundedSender<(SubgraphId, bool)>, - pub(crate) current_tick: TickInstant, - pub(crate) current_stratum: usize, + pub(super) current_tick: TickInstant, + pub(super) current_stratum: usize, - pub(crate) current_tick_start: SystemTime, - pub(crate) subgraph_last_tick_run_in: Option, + pub(super) current_tick_start: SystemTime, + pub(super) subgraph_last_tick_run_in: Option, /// The SubgraphId of the currently running operator. When this context is - /// not being forwarded to a running operator, this field is (mostly) - /// meaningless. - pub(crate) subgraph_id: SubgraphId, + /// not being forwarded to a running operator, this field is meaningless. + pub(super) subgraph_id: SubgraphId, tasks_to_spawn: Vec + 'static>>>, @@ -206,13 +217,19 @@ impl Context { futures::future::join_all(self.task_join_handles.drain(..)).await; } } -/// Internal APIs. -impl Context { - /// Create a new context for the Hydroflow graph instance, used internally. - pub(crate) fn new(event_queue_send: UnboundedSender<(SubgraphId, bool)>) -> Self { - Context { + +impl Default for Context { + fn default() -> Self { + let stratum_queues = vec![Default::default()]; // Always initialize stratum #0. + let (event_queue_send, event_queue_recv) = mpsc::unbounded_channel(); + Self { states: Vec::new(), + stratum_queues, + event_queue_recv, + can_start_tick: false, + events_received_tick: false, + event_queue_send, current_stratum: 0, @@ -227,8 +244,19 @@ impl Context { task_join_handles: Vec::new(), } } +} +/// Internal APIs. +impl Context { + /// Makes sure stratum STRATUM is initialized. + pub(super) fn init_stratum(&mut self, stratum: usize) { + if self.stratum_queues.len() <= stratum { + self.stratum_queues + .resize_with(stratum + 1, Default::default); + } + } - pub(crate) fn reset_state_at_end_of_tick(&mut self) { + /// Call this at the end of a tick, + pub(super) fn reset_state_at_end_of_tick(&mut self) { for StateData { state, tick_reset } in self.states.iter_mut() { if let Some(tick_reset) = tick_reset { (tick_reset)(Box::deref_mut(state)); diff --git a/hydroflow/src/scheduled/graph.rs b/hydroflow/src/scheduled/graph.rs index 2d4e96962ee2..8c36f3bf7fb2 100644 --- a/hydroflow/src/scheduled/graph.rs +++ b/hydroflow/src/scheduled/graph.rs @@ -3,7 +3,6 @@ use std::any::Any; use std::borrow::Cow; use std::cell::Cell; -use std::collections::VecDeque; use std::future::Future; use std::marker::PhantomData; @@ -11,7 +10,6 @@ use hydroflow_lang::diagnostic::{Diagnostic, SerdeSpan}; use hydroflow_lang::graph::HydroflowGraph; use ref_cast::RefCast; use smallvec::SmallVec; -use tokio::sync::mpsc::{self, UnboundedReceiver}; use web_time::SystemTime; use super::context::Context; @@ -26,47 +24,18 @@ use crate::scheduled::ticks::{TickDuration, TickInstant}; use crate::Never; /// A Hydroflow graph. Owns, schedules, and runs the compiled subgraphs. +#[derive(Default)] pub struct Hydroflow<'a> { pub(super) subgraphs: Vec>, pub(super) context: Context, handoffs: Vec, - /// TODO(mingwei): separate scheduler into its own struct/trait? - /// Index is stratum, value is FIFO queue for that stratum. - stratum_queues: Vec>, - /// Receive events, if second arg indicates if it is an external "important" event (true). - event_queue_recv: UnboundedReceiver<(SubgraphId, bool)>, - /// If external events or data can justify starting the next tick. - can_start_tick: bool, - /// If the events have been received for this tick. - events_received_tick: bool, - /// See [`Self::meta_graph()`]. meta_graph: Option, /// See [`Self::diagnostics()`]. diagnostics: Option>>, } -impl Default for Hydroflow<'_> { - fn default() -> Self { - let stratum_queues = vec![Default::default()]; // Always initialize stratum #0. - let (event_queue_send, event_queue_recv) = mpsc::unbounded_channel(); - let context = Context::new(event_queue_send); - Self { - subgraphs: Vec::new(), - context, - handoffs: Vec::new(), - - stratum_queues, - event_queue_recv, - can_start_tick: false, - events_received_tick: false, - - meta_graph: None, - diagnostics: None, - } - } -} /// Methods for [`TeeingHandoff`] teeing and dropping. impl Hydroflow<'_> { @@ -279,7 +248,9 @@ impl<'a> Hydroflow<'a> { let mut work_done = false; - while let Some(sg_id) = self.stratum_queues[self.context.current_stratum].pop_front() { + while let Some(sg_id) = + self.context.stratum_queues[self.context.current_stratum].pop_front() + { work_done = true; { let sg_data = &mut self.subgraphs[sg_id.0]; @@ -305,11 +276,11 @@ impl<'a> Hydroflow<'a> { let succ_sg_data = &self.subgraphs[succ_id.0]; // If we have sent data to the next tick, then we can start the next tick. if succ_sg_data.stratum < self.context.current_stratum && !sg_data.is_lazy { - self.can_start_tick = true; + self.context.can_start_tick = true; } // Add subgraph to stratum queue if it is not already scheduled. if !succ_sg_data.is_scheduled.replace(true) { - self.stratum_queues[succ_sg_data.stratum].push_back(succ_id); + self.context.stratum_queues[succ_sg_data.stratum].push_back(succ_id); } } } @@ -332,19 +303,19 @@ impl<'a> Hydroflow<'a> { #[tracing::instrument(level = "trace", skip(self), ret)] pub fn next_stratum(&mut self, current_tick_only: bool) -> bool { tracing::trace!( - events_received_tick = self.events_received_tick, - can_start_tick = self.can_start_tick, + events_received_tick = self.context.events_received_tick, + can_start_tick = self.context.can_start_tick, "Starting `next_stratum` call.", ); if 0 == self.context.current_stratum { // Starting the tick, reset this to `false`. tracing::trace!("Starting tick, setting `can_start_tick = false`."); - self.can_start_tick = false; + self.context.can_start_tick = false; self.context.current_tick_start = SystemTime::now(); // Ensure external events are received before running the tick. - if !self.events_received_tick { + if !self.context.events_received_tick { // Add any external jobs to ready queue. self.try_recv_events(); } @@ -361,7 +332,7 @@ impl<'a> Hydroflow<'a> { ); // If current stratum has work, return true. - if !self.stratum_queues[self.context.current_stratum].is_empty() { + if !self.context.stratum_queues[self.context.current_stratum].is_empty() { tracing::trace!( tick = u64::from(self.context.current_tick), stratum = self.context.current_stratum, @@ -372,9 +343,9 @@ impl<'a> Hydroflow<'a> { // Increment stratum counter. self.context.current_stratum += 1; - if self.context.current_stratum >= self.stratum_queues.len() { + if self.context.current_stratum >= self.context.stratum_queues.len() { tracing::trace!( - can_start_tick = self.can_start_tick, + can_start_tick = self.context.can_start_tick, "End of tick {}, starting tick {}.", self.context.current_tick, self.context.current_tick + TickDuration::SINGLE_TICK, @@ -383,7 +354,7 @@ impl<'a> Hydroflow<'a> { self.context.current_stratum = 0; self.context.current_tick += TickDuration::SINGLE_TICK; - self.events_received_tick = false; + self.context.events_received_tick = false; if current_tick_only { tracing::trace!( @@ -392,7 +363,7 @@ impl<'a> Hydroflow<'a> { return false; } else { self.try_recv_events(); - if std::mem::replace(&mut self.can_start_tick, false) { + if std::mem::replace(&mut self.context.can_start_tick, false) { tracing::trace!( tick = u64::from(self.context.current_tick), "`can_start_tick` is `true`, continuing." @@ -404,7 +375,7 @@ impl<'a> Hydroflow<'a> { tracing::trace!( "`can_start_tick` is `false`, re-setting `events_received_tick = false`, returning `false`." ); - self.events_received_tick = false; + self.context.events_received_tick = false; return false; } } @@ -417,7 +388,7 @@ impl<'a> Hydroflow<'a> { // returned true. Therefore we can return false without checking. // Also means nothing was done so we can reset the stratum to zero and wait for // events. - self.events_received_tick = false; + self.context.events_received_tick = false; self.context.current_stratum = 0; return false; } @@ -450,10 +421,10 @@ impl<'a> Hydroflow<'a> { /// Enqueues subgraphs triggered by events without blocking. /// /// Returns the number of subgraphs enqueued, and if any were external. - #[tracing::instrument(level = "trace", skip(self), fields(events_received_tick = self.events_received_tick), ret)] + #[tracing::instrument(level = "trace", skip(self), fields(events_received_tick = self.context.events_received_tick), ret)] pub fn try_recv_events(&mut self) -> usize { let mut enqueued_count = 0; - while let Ok((sg_id, is_external)) = self.event_queue_recv.try_recv() { + while let Ok((sg_id, is_external)) = self.context.event_queue_recv.try_recv() { let sg_data = &self.subgraphs[sg_id.0]; tracing::trace!( sg_id = sg_id.0, @@ -462,34 +433,36 @@ impl<'a> Hydroflow<'a> { "Event received." ); if !sg_data.is_scheduled.replace(true) { - self.stratum_queues[sg_data.stratum].push_back(sg_id); + self.context.stratum_queues[sg_data.stratum].push_back(sg_id); enqueued_count += 1; } if is_external { // Next tick is triggered if we are at the start of the next tick (`!self.events_receved_tick`). // Or if the stratum is in the next tick. - if !self.events_received_tick || sg_data.stratum < self.context.current_stratum { + if !self.context.events_received_tick + || sg_data.stratum < self.context.current_stratum + { tracing::trace!( current_stratum = self.context.current_stratum, sg_stratum = sg_data.stratum, "External event, setting `can_start_tick = true`." ); - self.can_start_tick = true; + self.context.can_start_tick = true; } } } - self.events_received_tick = true; + self.context.events_received_tick = true; enqueued_count } /// Enqueues subgraphs triggered by external events, blocking until at /// least one subgraph is scheduled **from an external event**. - #[tracing::instrument(level = "trace", skip(self), fields(events_received_tick = self.events_received_tick), ret)] + #[tracing::instrument(level = "trace", skip(self), fields(events_received_tick = self.context.events_received_tick), ret)] pub fn recv_events(&mut self) -> Option { let mut count = 0; loop { - let (sg_id, is_external) = self.event_queue_recv.blocking_recv()?; + let (sg_id, is_external) = self.context.event_queue_recv.blocking_recv()?; let sg_data = &self.subgraphs[sg_id.0]; tracing::trace!( sg_id = sg_id.0, @@ -498,24 +471,26 @@ impl<'a> Hydroflow<'a> { "Event received." ); if !sg_data.is_scheduled.replace(true) { - self.stratum_queues[sg_data.stratum].push_back(sg_id); + self.context.stratum_queues[sg_data.stratum].push_back(sg_id); count += 1; } if is_external { // Next tick is triggered if we are at the start of the next tick (`!self.events_receved_tick`). // Or if the stratum is in the next tick. - if !self.events_received_tick || sg_data.stratum < self.context.current_stratum { + if !self.context.events_received_tick + || sg_data.stratum < self.context.current_stratum + { tracing::trace!( current_stratum = self.context.current_stratum, sg_stratum = sg_data.stratum, "External event, setting `can_start_tick = true`." ); - self.can_start_tick = true; + self.context.can_start_tick = true; } break; } } - self.events_received_tick = true; + self.context.events_received_tick = true; // Enqueue any other immediate events. let extra_count = self.try_recv_events(); @@ -527,12 +502,12 @@ impl<'a> Hydroflow<'a> { /// which may be zero if an external event scheduled an already-scheduled subgraph. /// /// Returns `None` if the event queue is closed, but that should not happen normally. - #[tracing::instrument(level = "trace", skip(self), fields(events_received_tick = self.events_received_tick), ret)] + #[tracing::instrument(level = "trace", skip(self), fields(events_received_tick = self.context.events_received_tick), ret)] pub async fn recv_events_async(&mut self) -> Option { let mut count = 0; loop { tracing::trace!("Awaiting events (`event_queue_recv`)."); - let (sg_id, is_external) = self.event_queue_recv.recv().await?; + let (sg_id, is_external) = self.context.event_queue_recv.recv().await?; let sg_data = &self.subgraphs[sg_id.0]; tracing::trace!( sg_id = sg_id.0, @@ -541,24 +516,26 @@ impl<'a> Hydroflow<'a> { "Event received." ); if !sg_data.is_scheduled.replace(true) { - self.stratum_queues[sg_data.stratum].push_back(sg_id); + self.context.stratum_queues[sg_data.stratum].push_back(sg_id); count += 1; } if is_external { // Next tick is triggered if we are at the start of the next tick (`!self.events_receved_tick`). // Or if the stratum is in the next tick. - if !self.events_received_tick || sg_data.stratum < self.context.current_stratum { + if !self.context.events_received_tick + || sg_data.stratum < self.context.current_stratum + { tracing::trace!( current_stratum = self.context.current_stratum, sg_stratum = sg_data.stratum, "External event, setting `can_start_tick = true`." ); - self.can_start_tick = true; + self.context.can_start_tick = true; } break; } } - self.events_received_tick = true; + self.context.events_received_tick = true; // Enqueue any other immediate events. let extra_count = self.try_recv_events(); @@ -570,7 +547,7 @@ impl<'a> Hydroflow<'a> { let sg_data = &self.subgraphs[sg_id.0]; let already_scheduled = sg_data.is_scheduled.replace(true); if !already_scheduled { - self.stratum_queues[sg_data.stratum].push_back(sg_id); + self.context.stratum_queues[sg_data.stratum].push_back(sg_id); true } else { false @@ -632,8 +609,8 @@ impl<'a> Hydroflow<'a> { true, laziness, )); - self.init_stratum(stratum); - self.stratum_queues[stratum].push_back(sg_id); + self.context.init_stratum(stratum); + self.context.stratum_queues[stratum].push_back(sg_id); sg_id } @@ -724,20 +701,12 @@ impl<'a> Hydroflow<'a> { true, false, )); - self.init_stratum(stratum); - self.stratum_queues[stratum].push_back(sg_id); + self.context.init_stratum(stratum); + self.context.stratum_queues[stratum].push_back(sg_id); sg_id } - /// Makes sure stratum STRATUM is initialized. - fn init_stratum(&mut self, stratum: usize) { - if self.stratum_queues.len() <= stratum { - self.stratum_queues - .resize_with(stratum + 1, Default::default); - } - } - /// Creates a handoff edge and returns the corresponding send and receive ports. pub fn make_edge(&mut self, name: Name) -> (SendPort, RecvPort) where