Skip to content

Commit

Permalink
Merge eb6b253 into sapling-pr-archive-shadaj
Browse files Browse the repository at this point in the history
  • Loading branch information
shadaj authored Dec 6, 2024
2 parents 6b7e2ff + eb6b253 commit f09cf8c
Show file tree
Hide file tree
Showing 19 changed files with 220 additions and 224 deletions.
12 changes: 12 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

70 changes: 49 additions & 21 deletions hydroflow/src/scheduled/context.rs
Original file line number Diff line number Diff line change
@@ -1,43 +1,54 @@
//! Module for the user-facing [`Context`] object.
//!
//! Provides APIs for state and scheduling.
use std::any::Any;
use std::collections::VecDeque;
use std::future::Future;
use std::marker::PhantomData;
use std::ops::DerefMut;
use std::pin::Pin;

use tokio::sync::mpsc::UnboundedSender;
use tokio::sync::mpsc::{self, UnboundedReceiver, UnboundedSender};
use tokio::task::JoinHandle;
use web_time::SystemTime;

use super::state::StateHandle;
use super::{StateId, SubgraphId};
use crate::scheduled::ticks::TickInstant;

/// The main state of the Hydroflow instance, which is provided as a reference
/// to each operator as it is run.
/// The main state and scheduler of the Hydroflow instance. Provided as the `context` API to each
/// subgraph/operator as it is run.
///
/// As an optimization, each Hydroflow instances stores eactly one Context
/// inline, which allows us to avoid any construction/deconstruction costs.
/// Before the `Context` is provided to a running operator, the `subgraph_id`
/// field must be updated.
/// Each Hydroflow instances stores eactly one Context inline. Before the `Context` is provided to
/// a running operator, the `subgraph_id` field must be updated.
pub struct Context {
/// User-facing State API.
states: Vec<StateData>,

/// TODO(mingwei): separate scheduler into its own struct/trait?
/// Index is stratum, value is FIFO queue for that stratum.
pub(super) stratum_queues: Vec<VecDeque<SubgraphId>>,
/// Receive events, if second arg indicates if it is an external "important" event (true).
pub(super) event_queue_recv: UnboundedReceiver<(SubgraphId, bool)>,
/// If external events or data can justify starting the next tick.
pub(super) can_start_tick: bool,
/// If the events have been received for this tick.
pub(super) events_received_tick: bool,

// TODO(mingwei): as long as this is here, it's impossible to know when all work is done.
// Second field (bool) is for if the event is an external "important" event (true).
pub(crate) event_queue_send: UnboundedSender<(SubgraphId, bool)>,
pub(super) event_queue_send: UnboundedSender<(SubgraphId, bool)>,

pub(crate) current_tick: TickInstant,
pub(crate) current_stratum: usize,
pub(super) current_tick: TickInstant,
pub(super) current_stratum: usize,

pub(crate) current_tick_start: SystemTime,
pub(crate) subgraph_last_tick_run_in: Option<TickInstant>,
pub(super) current_tick_start: SystemTime,
pub(super) subgraph_last_tick_run_in: Option<TickInstant>,

/// The SubgraphId of the currently running operator. When this context is
/// not being forwarded to a running operator, this field is (mostly)
/// meaningless.
pub(crate) subgraph_id: SubgraphId,
/// not being forwarded to a running operator, this field is meaningless.
pub(super) subgraph_id: SubgraphId,

tasks_to_spawn: Vec<Pin<Box<dyn Future<Output = ()> + 'static>>>,

Expand Down Expand Up @@ -206,13 +217,19 @@ impl Context {
futures::future::join_all(self.task_join_handles.drain(..)).await;
}
}
/// Internal APIs.
impl Context {
/// Create a new context for the Hydroflow graph instance, used internally.
pub(crate) fn new(event_queue_send: UnboundedSender<(SubgraphId, bool)>) -> Self {
Context {

impl Default for Context {
fn default() -> Self {
let stratum_queues = vec![Default::default()]; // Always initialize stratum #0.
let (event_queue_send, event_queue_recv) = mpsc::unbounded_channel();
Self {
states: Vec::new(),

stratum_queues,
event_queue_recv,
can_start_tick: false,
events_received_tick: false,

event_queue_send,

current_stratum: 0,
Expand All @@ -227,8 +244,19 @@ impl Context {
task_join_handles: Vec::new(),
}
}
}
/// Internal APIs.
impl Context {
/// Makes sure stratum STRATUM is initialized.
pub(super) fn init_stratum(&mut self, stratum: usize) {
if self.stratum_queues.len() <= stratum {
self.stratum_queues
.resize_with(stratum + 1, Default::default);
}
}

pub(crate) fn reset_state_at_end_of_tick(&mut self) {
/// Call this at the end of a tick,
pub(super) fn reset_state_at_end_of_tick(&mut self) {
for StateData { state, tick_reset } in self.states.iter_mut() {
if let Some(tick_reset) = tick_reset {
(tick_reset)(Box::deref_mut(state));
Expand Down
Loading

0 comments on commit f09cf8c

Please sign in to comment.