diff --git a/Cargo.lock b/Cargo.lock index d26a114e4e86..ca8aab2608b0 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1590,6 +1590,7 @@ dependencies = [ "hydroflow", "hydroflow_lang", "insta", + "match_box", "nameof", "prettyplease", "proc-macro-crate", @@ -2014,6 +2015,17 @@ version = "0.4.22" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "a7a70ba024b9dc04c27ea2f0c0548feb474ec5c54bba33a7f72f873a39d07b24" +[[package]] +name = "match_box" +version = "0.0.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "c33c1d4fa92364abfc42bcc58c201cfbb63ae80f5e471aac5c051db48dab6843" +dependencies = [ + "proc-macro2", + "quote", + "syn 2.0.75", +] + [[package]] name = "matchers" version = "0.1.0" diff --git a/hydroflow/src/scheduled/context.rs b/hydroflow/src/scheduled/context.rs index 029f43736ba4..982277d2b1c0 100644 --- a/hydroflow/src/scheduled/context.rs +++ b/hydroflow/src/scheduled/context.rs @@ -1,12 +1,15 @@ //! Module for the user-facing [`Context`] object. +//! +//! Provides APIs for state and scheduling. use std::any::Any; +use std::collections::VecDeque; use std::future::Future; use std::marker::PhantomData; use std::ops::DerefMut; use std::pin::Pin; -use tokio::sync::mpsc::UnboundedSender; +use tokio::sync::mpsc::{self, UnboundedReceiver, UnboundedSender}; use tokio::task::JoinHandle; use web_time::SystemTime; @@ -14,30 +17,38 @@ use super::state::StateHandle; use super::{StateId, SubgraphId}; use crate::scheduled::ticks::TickInstant; -/// The main state of the Hydroflow instance, which is provided as a reference -/// to each operator as it is run. +/// The main state and scheduler of the Hydroflow instance. Provided as the `context` API to each +/// subgraph/operator as it is run. /// -/// As an optimization, each Hydroflow instances stores eactly one Context -/// inline, which allows us to avoid any construction/deconstruction costs. -/// Before the `Context` is provided to a running operator, the `subgraph_id` -/// field must be updated. +/// Each Hydroflow instances stores eactly one Context inline. Before the `Context` is provided to +/// a running operator, the `subgraph_id` field must be updated. pub struct Context { + /// User-facing State API. states: Vec, + /// TODO(mingwei): separate scheduler into its own struct/trait? + /// Index is stratum, value is FIFO queue for that stratum. + pub(super) stratum_queues: Vec>, + /// Receive events, if second arg indicates if it is an external "important" event (true). + pub(super) event_queue_recv: UnboundedReceiver<(SubgraphId, bool)>, + /// If external events or data can justify starting the next tick. + pub(super) can_start_tick: bool, + /// If the events have been received for this tick. + pub(super) events_received_tick: bool, + // TODO(mingwei): as long as this is here, it's impossible to know when all work is done. // Second field (bool) is for if the event is an external "important" event (true). - pub(crate) event_queue_send: UnboundedSender<(SubgraphId, bool)>, + pub(super) event_queue_send: UnboundedSender<(SubgraphId, bool)>, - pub(crate) current_tick: TickInstant, - pub(crate) current_stratum: usize, + pub(super) current_tick: TickInstant, + pub(super) current_stratum: usize, - pub(crate) current_tick_start: SystemTime, - pub(crate) subgraph_last_tick_run_in: Option, + pub(super) current_tick_start: SystemTime, + pub(super) subgraph_last_tick_run_in: Option, /// The SubgraphId of the currently running operator. When this context is - /// not being forwarded to a running operator, this field is (mostly) - /// meaningless. - pub(crate) subgraph_id: SubgraphId, + /// not being forwarded to a running operator, this field is meaningless. + pub(super) subgraph_id: SubgraphId, tasks_to_spawn: Vec + 'static>>>, @@ -206,13 +217,19 @@ impl Context { futures::future::join_all(self.task_join_handles.drain(..)).await; } } -/// Internal APIs. -impl Context { - /// Create a new context for the Hydroflow graph instance, used internally. - pub(crate) fn new(event_queue_send: UnboundedSender<(SubgraphId, bool)>) -> Self { - Context { + +impl Default for Context { + fn default() -> Self { + let stratum_queues = vec![Default::default()]; // Always initialize stratum #0. + let (event_queue_send, event_queue_recv) = mpsc::unbounded_channel(); + Self { states: Vec::new(), + stratum_queues, + event_queue_recv, + can_start_tick: false, + events_received_tick: false, + event_queue_send, current_stratum: 0, @@ -227,8 +244,19 @@ impl Context { task_join_handles: Vec::new(), } } +} +/// Internal APIs. +impl Context { + /// Makes sure stratum STRATUM is initialized. + pub(super) fn init_stratum(&mut self, stratum: usize) { + if self.stratum_queues.len() <= stratum { + self.stratum_queues + .resize_with(stratum + 1, Default::default); + } + } - pub(crate) fn reset_state_at_end_of_tick(&mut self) { + /// Call this at the end of a tick, + pub(super) fn reset_state_at_end_of_tick(&mut self) { for StateData { state, tick_reset } in self.states.iter_mut() { if let Some(tick_reset) = tick_reset { (tick_reset)(Box::deref_mut(state)); diff --git a/hydroflow/src/scheduled/graph.rs b/hydroflow/src/scheduled/graph.rs index 2d4e96962ee2..8c36f3bf7fb2 100644 --- a/hydroflow/src/scheduled/graph.rs +++ b/hydroflow/src/scheduled/graph.rs @@ -3,7 +3,6 @@ use std::any::Any; use std::borrow::Cow; use std::cell::Cell; -use std::collections::VecDeque; use std::future::Future; use std::marker::PhantomData; @@ -11,7 +10,6 @@ use hydroflow_lang::diagnostic::{Diagnostic, SerdeSpan}; use hydroflow_lang::graph::HydroflowGraph; use ref_cast::RefCast; use smallvec::SmallVec; -use tokio::sync::mpsc::{self, UnboundedReceiver}; use web_time::SystemTime; use super::context::Context; @@ -26,47 +24,18 @@ use crate::scheduled::ticks::{TickDuration, TickInstant}; use crate::Never; /// A Hydroflow graph. Owns, schedules, and runs the compiled subgraphs. +#[derive(Default)] pub struct Hydroflow<'a> { pub(super) subgraphs: Vec>, pub(super) context: Context, handoffs: Vec, - /// TODO(mingwei): separate scheduler into its own struct/trait? - /// Index is stratum, value is FIFO queue for that stratum. - stratum_queues: Vec>, - /// Receive events, if second arg indicates if it is an external "important" event (true). - event_queue_recv: UnboundedReceiver<(SubgraphId, bool)>, - /// If external events or data can justify starting the next tick. - can_start_tick: bool, - /// If the events have been received for this tick. - events_received_tick: bool, - /// See [`Self::meta_graph()`]. meta_graph: Option, /// See [`Self::diagnostics()`]. diagnostics: Option>>, } -impl Default for Hydroflow<'_> { - fn default() -> Self { - let stratum_queues = vec![Default::default()]; // Always initialize stratum #0. - let (event_queue_send, event_queue_recv) = mpsc::unbounded_channel(); - let context = Context::new(event_queue_send); - Self { - subgraphs: Vec::new(), - context, - handoffs: Vec::new(), - - stratum_queues, - event_queue_recv, - can_start_tick: false, - events_received_tick: false, - - meta_graph: None, - diagnostics: None, - } - } -} /// Methods for [`TeeingHandoff`] teeing and dropping. impl Hydroflow<'_> { @@ -279,7 +248,9 @@ impl<'a> Hydroflow<'a> { let mut work_done = false; - while let Some(sg_id) = self.stratum_queues[self.context.current_stratum].pop_front() { + while let Some(sg_id) = + self.context.stratum_queues[self.context.current_stratum].pop_front() + { work_done = true; { let sg_data = &mut self.subgraphs[sg_id.0]; @@ -305,11 +276,11 @@ impl<'a> Hydroflow<'a> { let succ_sg_data = &self.subgraphs[succ_id.0]; // If we have sent data to the next tick, then we can start the next tick. if succ_sg_data.stratum < self.context.current_stratum && !sg_data.is_lazy { - self.can_start_tick = true; + self.context.can_start_tick = true; } // Add subgraph to stratum queue if it is not already scheduled. if !succ_sg_data.is_scheduled.replace(true) { - self.stratum_queues[succ_sg_data.stratum].push_back(succ_id); + self.context.stratum_queues[succ_sg_data.stratum].push_back(succ_id); } } } @@ -332,19 +303,19 @@ impl<'a> Hydroflow<'a> { #[tracing::instrument(level = "trace", skip(self), ret)] pub fn next_stratum(&mut self, current_tick_only: bool) -> bool { tracing::trace!( - events_received_tick = self.events_received_tick, - can_start_tick = self.can_start_tick, + events_received_tick = self.context.events_received_tick, + can_start_tick = self.context.can_start_tick, "Starting `next_stratum` call.", ); if 0 == self.context.current_stratum { // Starting the tick, reset this to `false`. tracing::trace!("Starting tick, setting `can_start_tick = false`."); - self.can_start_tick = false; + self.context.can_start_tick = false; self.context.current_tick_start = SystemTime::now(); // Ensure external events are received before running the tick. - if !self.events_received_tick { + if !self.context.events_received_tick { // Add any external jobs to ready queue. self.try_recv_events(); } @@ -361,7 +332,7 @@ impl<'a> Hydroflow<'a> { ); // If current stratum has work, return true. - if !self.stratum_queues[self.context.current_stratum].is_empty() { + if !self.context.stratum_queues[self.context.current_stratum].is_empty() { tracing::trace!( tick = u64::from(self.context.current_tick), stratum = self.context.current_stratum, @@ -372,9 +343,9 @@ impl<'a> Hydroflow<'a> { // Increment stratum counter. self.context.current_stratum += 1; - if self.context.current_stratum >= self.stratum_queues.len() { + if self.context.current_stratum >= self.context.stratum_queues.len() { tracing::trace!( - can_start_tick = self.can_start_tick, + can_start_tick = self.context.can_start_tick, "End of tick {}, starting tick {}.", self.context.current_tick, self.context.current_tick + TickDuration::SINGLE_TICK, @@ -383,7 +354,7 @@ impl<'a> Hydroflow<'a> { self.context.current_stratum = 0; self.context.current_tick += TickDuration::SINGLE_TICK; - self.events_received_tick = false; + self.context.events_received_tick = false; if current_tick_only { tracing::trace!( @@ -392,7 +363,7 @@ impl<'a> Hydroflow<'a> { return false; } else { self.try_recv_events(); - if std::mem::replace(&mut self.can_start_tick, false) { + if std::mem::replace(&mut self.context.can_start_tick, false) { tracing::trace!( tick = u64::from(self.context.current_tick), "`can_start_tick` is `true`, continuing." @@ -404,7 +375,7 @@ impl<'a> Hydroflow<'a> { tracing::trace!( "`can_start_tick` is `false`, re-setting `events_received_tick = false`, returning `false`." ); - self.events_received_tick = false; + self.context.events_received_tick = false; return false; } } @@ -417,7 +388,7 @@ impl<'a> Hydroflow<'a> { // returned true. Therefore we can return false without checking. // Also means nothing was done so we can reset the stratum to zero and wait for // events. - self.events_received_tick = false; + self.context.events_received_tick = false; self.context.current_stratum = 0; return false; } @@ -450,10 +421,10 @@ impl<'a> Hydroflow<'a> { /// Enqueues subgraphs triggered by events without blocking. /// /// Returns the number of subgraphs enqueued, and if any were external. - #[tracing::instrument(level = "trace", skip(self), fields(events_received_tick = self.events_received_tick), ret)] + #[tracing::instrument(level = "trace", skip(self), fields(events_received_tick = self.context.events_received_tick), ret)] pub fn try_recv_events(&mut self) -> usize { let mut enqueued_count = 0; - while let Ok((sg_id, is_external)) = self.event_queue_recv.try_recv() { + while let Ok((sg_id, is_external)) = self.context.event_queue_recv.try_recv() { let sg_data = &self.subgraphs[sg_id.0]; tracing::trace!( sg_id = sg_id.0, @@ -462,34 +433,36 @@ impl<'a> Hydroflow<'a> { "Event received." ); if !sg_data.is_scheduled.replace(true) { - self.stratum_queues[sg_data.stratum].push_back(sg_id); + self.context.stratum_queues[sg_data.stratum].push_back(sg_id); enqueued_count += 1; } if is_external { // Next tick is triggered if we are at the start of the next tick (`!self.events_receved_tick`). // Or if the stratum is in the next tick. - if !self.events_received_tick || sg_data.stratum < self.context.current_stratum { + if !self.context.events_received_tick + || sg_data.stratum < self.context.current_stratum + { tracing::trace!( current_stratum = self.context.current_stratum, sg_stratum = sg_data.stratum, "External event, setting `can_start_tick = true`." ); - self.can_start_tick = true; + self.context.can_start_tick = true; } } } - self.events_received_tick = true; + self.context.events_received_tick = true; enqueued_count } /// Enqueues subgraphs triggered by external events, blocking until at /// least one subgraph is scheduled **from an external event**. - #[tracing::instrument(level = "trace", skip(self), fields(events_received_tick = self.events_received_tick), ret)] + #[tracing::instrument(level = "trace", skip(self), fields(events_received_tick = self.context.events_received_tick), ret)] pub fn recv_events(&mut self) -> Option { let mut count = 0; loop { - let (sg_id, is_external) = self.event_queue_recv.blocking_recv()?; + let (sg_id, is_external) = self.context.event_queue_recv.blocking_recv()?; let sg_data = &self.subgraphs[sg_id.0]; tracing::trace!( sg_id = sg_id.0, @@ -498,24 +471,26 @@ impl<'a> Hydroflow<'a> { "Event received." ); if !sg_data.is_scheduled.replace(true) { - self.stratum_queues[sg_data.stratum].push_back(sg_id); + self.context.stratum_queues[sg_data.stratum].push_back(sg_id); count += 1; } if is_external { // Next tick is triggered if we are at the start of the next tick (`!self.events_receved_tick`). // Or if the stratum is in the next tick. - if !self.events_received_tick || sg_data.stratum < self.context.current_stratum { + if !self.context.events_received_tick + || sg_data.stratum < self.context.current_stratum + { tracing::trace!( current_stratum = self.context.current_stratum, sg_stratum = sg_data.stratum, "External event, setting `can_start_tick = true`." ); - self.can_start_tick = true; + self.context.can_start_tick = true; } break; } } - self.events_received_tick = true; + self.context.events_received_tick = true; // Enqueue any other immediate events. let extra_count = self.try_recv_events(); @@ -527,12 +502,12 @@ impl<'a> Hydroflow<'a> { /// which may be zero if an external event scheduled an already-scheduled subgraph. /// /// Returns `None` if the event queue is closed, but that should not happen normally. - #[tracing::instrument(level = "trace", skip(self), fields(events_received_tick = self.events_received_tick), ret)] + #[tracing::instrument(level = "trace", skip(self), fields(events_received_tick = self.context.events_received_tick), ret)] pub async fn recv_events_async(&mut self) -> Option { let mut count = 0; loop { tracing::trace!("Awaiting events (`event_queue_recv`)."); - let (sg_id, is_external) = self.event_queue_recv.recv().await?; + let (sg_id, is_external) = self.context.event_queue_recv.recv().await?; let sg_data = &self.subgraphs[sg_id.0]; tracing::trace!( sg_id = sg_id.0, @@ -541,24 +516,26 @@ impl<'a> Hydroflow<'a> { "Event received." ); if !sg_data.is_scheduled.replace(true) { - self.stratum_queues[sg_data.stratum].push_back(sg_id); + self.context.stratum_queues[sg_data.stratum].push_back(sg_id); count += 1; } if is_external { // Next tick is triggered if we are at the start of the next tick (`!self.events_receved_tick`). // Or if the stratum is in the next tick. - if !self.events_received_tick || sg_data.stratum < self.context.current_stratum { + if !self.context.events_received_tick + || sg_data.stratum < self.context.current_stratum + { tracing::trace!( current_stratum = self.context.current_stratum, sg_stratum = sg_data.stratum, "External event, setting `can_start_tick = true`." ); - self.can_start_tick = true; + self.context.can_start_tick = true; } break; } } - self.events_received_tick = true; + self.context.events_received_tick = true; // Enqueue any other immediate events. let extra_count = self.try_recv_events(); @@ -570,7 +547,7 @@ impl<'a> Hydroflow<'a> { let sg_data = &self.subgraphs[sg_id.0]; let already_scheduled = sg_data.is_scheduled.replace(true); if !already_scheduled { - self.stratum_queues[sg_data.stratum].push_back(sg_id); + self.context.stratum_queues[sg_data.stratum].push_back(sg_id); true } else { false @@ -632,8 +609,8 @@ impl<'a> Hydroflow<'a> { true, laziness, )); - self.init_stratum(stratum); - self.stratum_queues[stratum].push_back(sg_id); + self.context.init_stratum(stratum); + self.context.stratum_queues[stratum].push_back(sg_id); sg_id } @@ -724,20 +701,12 @@ impl<'a> Hydroflow<'a> { true, false, )); - self.init_stratum(stratum); - self.stratum_queues[stratum].push_back(sg_id); + self.context.init_stratum(stratum); + self.context.stratum_queues[stratum].push_back(sg_id); sg_id } - /// Makes sure stratum STRATUM is initialized. - fn init_stratum(&mut self, stratum: usize) { - if self.stratum_queues.len() <= stratum { - self.stratum_queues - .resize_with(stratum + 1, Default::default); - } - } - /// Creates a handoff edge and returns the corresponding send and receive ports. pub fn make_edge(&mut self, name: Name) -> (SendPort, RecvPort) where diff --git a/hydroflow/tests/snapshots/surface_examples__example_2_simple_1.snap b/hydroflow/tests/snapshots/surface_examples__example_2_simple_1.snap index cf167828f0c6..90de61d5dbe4 100644 --- a/hydroflow/tests/snapshots/surface_examples__example_2_simple_1.snap +++ b/hydroflow/tests/snapshots/surface_examples__example_2_simple_1.snap @@ -14,4 +14,3 @@ Howdy 64 Howdy 65 Howdy 81 Howdy 82 - diff --git a/hydroflow/tests/snapshots/surface_examples__example_2_simple_2.snap b/hydroflow/tests/snapshots/surface_examples__example_2_simple_2.snap index 73757ce90bcf..c71290ae9336 100644 --- a/hydroflow/tests/snapshots/surface_examples__example_2_simple_2.snap +++ b/hydroflow/tests/snapshots/surface_examples__example_2_simple_2.snap @@ -14,4 +14,3 @@ G'day 64 G'day 65 G'day 81 G'day 82 - diff --git a/hydroflow/tests/snapshots/surface_examples__example_3_stream.snap b/hydroflow/tests/snapshots/surface_examples__example_3_stream.snap index 0df8637c3972..bd523917a67a 100644 --- a/hydroflow/tests/snapshots/surface_examples__example_3_stream.snap +++ b/hydroflow/tests/snapshots/surface_examples__example_3_stream.snap @@ -16,4 +16,3 @@ Ahoy, 64 Ahoy, 65 Ahoy, 81 Ahoy, 82 - diff --git a/hydroflow/tests/snapshots/surface_examples__example_4_neighbors.snap b/hydroflow/tests/snapshots/surface_examples__example_4_neighbors.snap index ebd58d9f81e1..7b931190d85a 100644 --- a/hydroflow/tests/snapshots/surface_examples__example_4_neighbors.snap +++ b/hydroflow/tests/snapshots/surface_examples__example_4_neighbors.snap @@ -44,4 +44,3 @@ end Reached: 0 Reached: 1 Reached: 3 - diff --git a/hydroflow/tests/snapshots/surface_examples__example_5_reachability.snap b/hydroflow/tests/snapshots/surface_examples__example_5_reachability.snap index 1575fcd20773..157ecc566559 100644 --- a/hydroflow/tests/snapshots/surface_examples__example_5_reachability.snap +++ b/hydroflow/tests/snapshots/surface_examples__example_5_reachability.snap @@ -60,4 +60,3 @@ Reached: 1 Reached: 3 Reached: 2 Reached: 4 - diff --git a/hydroflow/tests/snapshots/surface_examples__example_6_unreachability.snap b/hydroflow/tests/snapshots/surface_examples__example_6_unreachability.snap index 73281bb1a1f4..2737c73983ac 100644 --- a/hydroflow/tests/snapshots/surface_examples__example_6_unreachability.snap +++ b/hydroflow/tests/snapshots/surface_examples__example_6_unreachability.snap @@ -98,4 +98,3 @@ Received vertex: 11 Received vertex: 12 unreached_vertices vertex: 11 unreached_vertices vertex: 12 - diff --git a/hydroflow/tests/snapshots/surface_examples__example_surface_flows_1_basic.snap b/hydroflow/tests/snapshots/surface_examples__example_surface_flows_1_basic.snap index 4ab296cb6a47..e3a2e0a2da93 100644 --- a/hydroflow/tests/snapshots/surface_examples__example_surface_flows_1_basic.snap +++ b/hydroflow/tests/snapshots/surface_examples__example_surface_flows_1_basic.snap @@ -4,4 +4,3 @@ expression: output --- HELLO WORLD - diff --git a/hydroflow/tests/snapshots/surface_examples__example_surface_flows_2_varname.snap b/hydroflow/tests/snapshots/surface_examples__example_surface_flows_2_varname.snap index 4ab296cb6a47..e3a2e0a2da93 100644 --- a/hydroflow/tests/snapshots/surface_examples__example_surface_flows_2_varname.snap +++ b/hydroflow/tests/snapshots/surface_examples__example_surface_flows_2_varname.snap @@ -4,4 +4,3 @@ expression: output --- HELLO WORLD - diff --git a/hydroflow/tests/snapshots/surface_examples__example_surface_flows_3_ports.snap b/hydroflow/tests/snapshots/surface_examples__example_surface_flows_3_ports.snap index f9fb5c6ec859..83ed8d6a0daf 100644 --- a/hydroflow/tests/snapshots/surface_examples__example_surface_flows_3_ports.snap +++ b/hydroflow/tests/snapshots/surface_examples__example_surface_flows_3_ports.snap @@ -47,4 +47,3 @@ hello world HELLO WORLD - diff --git a/hydroflow/tests/snapshots/surface_examples__example_surface_flows_4_context.snap b/hydroflow/tests/snapshots/surface_examples__example_surface_flows_4_context.snap index 3e819e2a4842..2008630a0c89 100644 --- a/hydroflow/tests/snapshots/surface_examples__example_surface_flows_4_context.snap +++ b/hydroflow/tests/snapshots/surface_examples__example_surface_flows_4_context.snap @@ -3,4 +3,3 @@ source: hydroflow/tests/surface_examples.rs expression: output --- Current tick: [0], stratum: 0 - diff --git a/hydroflow/tests/snapshots/surface_examples__example_syntax_hello_world.snap b/hydroflow/tests/snapshots/surface_examples__example_syntax_hello_world.snap index 4ab296cb6a47..e3a2e0a2da93 100644 --- a/hydroflow/tests/snapshots/surface_examples__example_syntax_hello_world.snap +++ b/hydroflow/tests/snapshots/surface_examples__example_syntax_hello_world.snap @@ -4,4 +4,3 @@ expression: output --- HELLO WORLD - diff --git a/hydroflow/tests/snapshots/surface_examples__example_syntax_input.snap b/hydroflow/tests/snapshots/surface_examples__example_syntax_input.snap index 4ab296cb6a47..e3a2e0a2da93 100644 --- a/hydroflow/tests/snapshots/surface_examples__example_syntax_input.snap +++ b/hydroflow/tests/snapshots/surface_examples__example_syntax_input.snap @@ -4,4 +4,3 @@ expression: output --- HELLO WORLD - diff --git a/hydroflow_plus/Cargo.toml b/hydroflow_plus/Cargo.toml index 0023e55415f4..23ad3d334337 100644 --- a/hydroflow_plus/Cargo.toml +++ b/hydroflow_plus/Cargo.toml @@ -21,32 +21,32 @@ deploy_runtime = [ "hydroflow/deploy_integration" ] deploy = [ "deploy_runtime", "dep:hydro_deploy", "dep:trybuild-internals-api", "dep:toml", "dep:prettyplease" ] [dependencies] -quote = "1.0.35" -syn = { version = "2.0.46", features = [ "parsing", "extra-traits", "visit-mut" ] } -proc-macro2 = "1.0.74" -proc-macro-crate = "1.0.0" +bincode = "1.3.1" +hydro_deploy = { path = "../hydro_deploy/core", version = "^0.10.0", optional = true } hydroflow = { path = "../hydroflow", version = "^0.10.0", default-features = false } hydroflow_lang = { path = "../hydroflow_lang", version = "^0.10.0" } -serde = { version = "1.0.197", features = [ "derive" ] } -bincode = "1.3.1" -tokio = { version = "1.29.0", features = [ "full" ] } -stageleft = { path = "../stageleft", version = "^0.5.0" } - +match_box = "0.0.2" nameof = "1.0.0" +prettyplease = { version = "0.2.0", features = [ "verbatim" ], optional = true } +proc-macro-crate = "1.0.0" +proc-macro2 = "1.0.74" +quote = "1.0.35" +sealed = "0.6.0" +serde = { version = "1.0.197", features = [ "derive" ] } sha2 = "0.10.0" +stageleft = { path = "../stageleft", version = "^0.5.0" } stageleft_tool = { path = "../stageleft_tool", version = "^0.4.0" } -hydro_deploy = { path = "../hydro_deploy/core", version = "^0.10.0", optional = true } -prettyplease = { version = "0.2.0", features = [ "verbatim" ], optional = true } +syn = { version = "2.0.46", features = [ "parsing", "extra-traits", "visit-mut" ] } +tokio = { version = "1.29.0", features = [ "full" ] } toml = { version = "0.8.0", optional = true } trybuild-internals-api = { version = "1.0.99", optional = true } -sealed = "0.6.0" [build-dependencies] stageleft_tool = { path = "../stageleft_tool", version = "^0.4.0" } [dev-dependencies] -insta = "1.39" -hydro_deploy = { path = "../hydro_deploy/core", version = "^0.10.0" } async-ssh2-lite = { version = "0.5.0", features = ["vendored-openssl"] } ctor = "0.2.8" +hydro_deploy = { path = "../hydro_deploy/core", version = "^0.10.0" } +insta = "1.39" trybuild = "1" diff --git a/hydroflow_plus/src/deploy/trybuild.rs b/hydroflow_plus/src/deploy/trybuild.rs index 7922e342b91c..94fe61fbeadd 100644 --- a/hydroflow_plus/src/deploy/trybuild.rs +++ b/hydroflow_plus/src/deploy/trybuild.rs @@ -100,7 +100,6 @@ pub fn compile_graph_trybuild(graph: HydroflowGraph, extra_stmts: Vec ); let source_ast: syn::File = syn::parse_quote! { - #![feature(box_patterns)] #![allow(unused_imports, unused_crate_dependencies, missing_docs, non_snake_case)] use hydroflow_plus::*; diff --git a/hydroflow_plus/src/lib.rs b/hydroflow_plus/src/lib.rs index d9b25a8f5534..27ac6e51b4a7 100644 --- a/hydroflow_plus/src/lib.rs +++ b/hydroflow_plus/src/lib.rs @@ -1,5 +1,3 @@ -#![feature(box_patterns)] - stageleft::stageleft_no_entry_crate!(); pub use hydroflow; diff --git a/hydroflow_plus/src/rewrites/persist_pullup.rs b/hydroflow_plus/src/rewrites/persist_pullup.rs index 7176dfde8998..2924bf544847 100644 --- a/hydroflow_plus/src/rewrites/persist_pullup.rs +++ b/hydroflow_plus/src/rewrites/persist_pullup.rs @@ -7,114 +7,116 @@ fn persist_pullup_node( node: &mut HfPlusNode, persist_pulled_tees: &mut HashSet<*const RefCell>, ) { - *node = match std::mem::replace(node, HfPlusNode::Placeholder) { - HfPlusNode::Unpersist(box HfPlusNode::Persist(box behind_persist)) => behind_persist, - - HfPlusNode::Delta(box HfPlusNode::Persist(box behind_persist)) => behind_persist, - - HfPlusNode::Tee { inner } => { - if persist_pulled_tees.contains(&(inner.0.as_ref() as *const RefCell)) { - HfPlusNode::Persist(Box::new(HfPlusNode::Tee { - inner: TeeNode(inner.0.clone()), - })) - } else if matches!(*inner.0.borrow(), HfPlusNode::Persist(_)) { - persist_pulled_tees.insert(inner.0.as_ref() as *const RefCell); - if let HfPlusNode::Persist(box behind_persist) = - inner.0.replace(HfPlusNode::Placeholder) - { - *inner.0.borrow_mut() = behind_persist; + *node = match_box::match_box! { + match std::mem::replace(node, HfPlusNode::Placeholder) { + HfPlusNode::Unpersist(mb!(* HfPlusNode::Persist(mb!(* behind_persist)))) => behind_persist, + + HfPlusNode::Delta(mb!(* HfPlusNode::Persist(mb!(* behind_persist)))) => behind_persist, + + HfPlusNode::Tee { inner } => { + if persist_pulled_tees.contains(&(inner.0.as_ref() as *const RefCell)) { + HfPlusNode::Persist(Box::new(HfPlusNode::Tee { + inner: TeeNode(inner.0.clone()), + })) + } else if matches!(*inner.0.borrow(), HfPlusNode::Persist(_)) { + persist_pulled_tees.insert(inner.0.as_ref() as *const RefCell); + if let HfPlusNode::Persist(behind_persist) = + inner.0.replace(HfPlusNode::Placeholder) + { + *inner.0.borrow_mut() = *behind_persist; + } else { + unreachable!() + } + + HfPlusNode::Persist(Box::new(HfPlusNode::Tee { + inner: TeeNode(inner.0.clone()), + })) } else { - unreachable!() + HfPlusNode::Tee { inner } } + } - HfPlusNode::Persist(Box::new(HfPlusNode::Tee { - inner: TeeNode(inner.0.clone()), - })) - } else { - HfPlusNode::Tee { inner } + HfPlusNode::Map { + f, + input: mb!(* HfPlusNode::Persist(behind_persist)), + } => HfPlusNode::Persist(Box::new(HfPlusNode::Map { + f, + input: behind_persist, + })), + + HfPlusNode::FilterMap { + f, + input: mb!(* HfPlusNode::Persist(behind_persist)), + } => HfPlusNode::Persist(Box::new(HfPlusNode::FilterMap { + f, + input: behind_persist, + })), + + HfPlusNode::FlatMap { + f, + input: mb!(* HfPlusNode::Persist(behind_persist)), + } => HfPlusNode::Persist(Box::new(HfPlusNode::FlatMap { + f, + input: behind_persist, + })), + + HfPlusNode::Filter { + f, + input: mb!(* HfPlusNode::Persist(behind_persist)), + } => HfPlusNode::Persist(Box::new(HfPlusNode::Filter { + f, + input: behind_persist, + })), + + HfPlusNode::Network { + from_location, + from_key, + to_location, + to_key, + serialize_pipeline, + instantiate_fn, + deserialize_pipeline, + input: mb!(* HfPlusNode::Persist(behind_persist)), + .. + } => HfPlusNode::Persist(Box::new(HfPlusNode::Network { + from_location, + from_key, + to_location, + to_key, + serialize_pipeline, + instantiate_fn, + deserialize_pipeline, + input: behind_persist, + })), + + HfPlusNode::Chain(mb!(* HfPlusNode::Persist(left)), mb!(* HfPlusNode::Persist(right))) => { + HfPlusNode::Persist(Box::new(HfPlusNode::Chain(left, right))) } - } - HfPlusNode::Map { - f, - input: box HfPlusNode::Persist(behind_persist), - } => HfPlusNode::Persist(Box::new(HfPlusNode::Map { - f, - input: behind_persist, - })), - - HfPlusNode::FilterMap { - f, - input: box HfPlusNode::Persist(behind_persist), - } => HfPlusNode::Persist(Box::new(HfPlusNode::FilterMap { - f, - input: behind_persist, - })), - - HfPlusNode::FlatMap { - f, - input: box HfPlusNode::Persist(behind_persist), - } => HfPlusNode::Persist(Box::new(HfPlusNode::FlatMap { - f, - input: behind_persist, - })), - - HfPlusNode::Filter { - f, - input: box HfPlusNode::Persist(behind_persist), - } => HfPlusNode::Persist(Box::new(HfPlusNode::Filter { - f, - input: behind_persist, - })), - - HfPlusNode::Network { - from_location, - from_key, - to_location, - to_key, - serialize_pipeline, - instantiate_fn, - deserialize_pipeline, - input: box HfPlusNode::Persist(behind_persist), - .. - } => HfPlusNode::Persist(Box::new(HfPlusNode::Network { - from_location, - from_key, - to_location, - to_key, - serialize_pipeline, - instantiate_fn, - deserialize_pipeline, - input: behind_persist, - })), - - HfPlusNode::Chain(box HfPlusNode::Persist(left), box HfPlusNode::Persist(right)) => { - HfPlusNode::Persist(Box::new(HfPlusNode::Chain(left, right))) - } + HfPlusNode::CrossProduct(mb!(* HfPlusNode::Persist(left)), mb!(* HfPlusNode::Persist(right))) => { + HfPlusNode::Persist(Box::new(HfPlusNode::Delta(Box::new( + HfPlusNode::CrossProduct( + Box::new(HfPlusNode::Persist(left)), + Box::new(HfPlusNode::Persist(right)), + ), + )))) + } - HfPlusNode::CrossProduct(box HfPlusNode::Persist(left), box HfPlusNode::Persist(right)) => { - HfPlusNode::Persist(Box::new(HfPlusNode::Delta(Box::new( - HfPlusNode::CrossProduct( + HfPlusNode::Join(mb!(* HfPlusNode::Persist(left)), mb!(* HfPlusNode::Persist(right))) => { + HfPlusNode::Persist(Box::new(HfPlusNode::Delta(Box::new(HfPlusNode::Join( Box::new(HfPlusNode::Persist(left)), Box::new(HfPlusNode::Persist(right)), - ), - )))) - } + ))))) + } - HfPlusNode::Join(box HfPlusNode::Persist(left), box HfPlusNode::Persist(right)) => { - HfPlusNode::Persist(Box::new(HfPlusNode::Delta(Box::new(HfPlusNode::Join( - Box::new(HfPlusNode::Persist(left)), - Box::new(HfPlusNode::Persist(right)), - ))))) - } + HfPlusNode::Unique(mb!(* HfPlusNode::Persist(inner))) => { + HfPlusNode::Persist(Box::new(HfPlusNode::Delta(Box::new(HfPlusNode::Unique( + Box::new(HfPlusNode::Persist(inner)), + ))))) + } - HfPlusNode::Unique(box HfPlusNode::Persist(inner)) => { - HfPlusNode::Persist(Box::new(HfPlusNode::Delta(Box::new(HfPlusNode::Unique( - Box::new(HfPlusNode::Persist(inner)), - ))))) + node => node, } - - node => node, }; }