diff --git a/hydroflow/examples/schedule_bug/README.md b/hydroflow/examples/schedule_bug/README.md new file mode 100644 index 000000000000..d9f1e786bad2 --- /dev/null +++ b/hydroflow/examples/schedule_bug/README.md @@ -0,0 +1,6 @@ +# Hello World + +To run: +``` +cargo run -p hydroflow --example hello_world +``` diff --git a/hydroflow/examples/schedule_bug/main.rs b/hydroflow/examples/schedule_bug/main.rs new file mode 100644 index 000000000000..e1dc3c79432b --- /dev/null +++ b/hydroflow/examples/schedule_bug/main.rs @@ -0,0 +1,41 @@ +use hydroflow::hydroflow_syntax; +use multiplatform_test::multiplatform_test; + +pub fn main() { + { + let subscriber = tracing_subscriber::FmtSubscriber::builder() + .with_env_filter(tracing_subscriber::EnvFilter::from_default_env()) + .with_test_writer() + .finish(); + let _ = tracing::subscriber::set_global_default(subscriber); + } + + let mut df = hydroflow_syntax! { + // 3v1 + source_iter([1]) -> items; + items = union(); + + double = items + // -> persist() + -> fold::<'static>(|| 0, |accum, x| *accum += x) + // -> defer_tick_lazy() + -> inspect(|x| println!("{} {}: {}", context.current_tick(), context.current_stratum(), x)) + -> filter(|_| false) + -> tee(); + + double -> null(); + + double -> items; + }; + df.meta_graph() + .unwrap() + .open_mermaid(&Default::default()) + .unwrap(); + + df.run_available(); +} + +// #[multiplatform_test(test, env_tracing)] +// fn my_test() { +// main(); +// } diff --git a/hydroflow/examples/schedule_bug2/README.md b/hydroflow/examples/schedule_bug2/README.md new file mode 100644 index 000000000000..d9f1e786bad2 --- /dev/null +++ b/hydroflow/examples/schedule_bug2/README.md @@ -0,0 +1,6 @@ +# Hello World + +To run: +``` +cargo run -p hydroflow --example hello_world +``` diff --git a/hydroflow/examples/schedule_bug2/main.rs b/hydroflow/examples/schedule_bug2/main.rs new file mode 100644 index 000000000000..25689a8d457f --- /dev/null +++ b/hydroflow/examples/schedule_bug2/main.rs @@ -0,0 +1,31 @@ +use hydroflow::hydroflow_syntax; +use multiplatform_test::multiplatform_test; + +#[hydroflow::main] +pub async fn main() { + { + let subscriber = tracing_subscriber::FmtSubscriber::builder() + .with_env_filter(tracing_subscriber::EnvFilter::from_default_env()) + .with_test_writer() + .finish(); + let _ = tracing::subscriber::set_global_default(subscriber); + } + + let mut df = hydroflow_syntax! { + source_iter([1]) + -> persist() + -> inspect(|x| println!("{} {}: {}", context.current_tick(), context.current_stratum(), x)) + -> null(); + }; + df.meta_graph() + .unwrap() + .open_mermaid(&Default::default()) + .unwrap(); + + df.run_available(); +} + +// #[multiplatform_test(test, env_tracing)] +// fn my_test() { +// main(); +// } diff --git a/hydroflow/src/scheduled/graph.rs b/hydroflow/src/scheduled/graph.rs index 5ca3d03ccb3e..34e62535faca 100644 --- a/hydroflow/src/scheduled/graph.rs +++ b/hydroflow/src/scheduled/graph.rs @@ -201,7 +201,11 @@ impl<'a> Hydroflow<'a> { let sg_data = &mut self.subgraphs[sg_id.0]; // This must be true for the subgraph to be enqueued. assert!(sg_data.is_scheduled.take()); - tracing::trace!(sg_id = sg_id.0, "Running subgraph."); + tracing::trace!( + sg_id = sg_id.0, + sg_name = &*sg_data.name, + "Running subgraph." + ); self.context.subgraph_id = sg_id; self.context.subgraph_last_tick_run_in = sg_data.last_tick_run_in; @@ -218,10 +222,16 @@ impl<'a> Hydroflow<'a> { let succ_sg_data = &self.subgraphs[succ_id.0]; // If we have sent data to the next tick, then we can start the next tick. if succ_sg_data.stratum < self.context.current_stratum && !sg_data.is_lazy { + tracing::trace!("Data sent to next tick, setting `can_start_tick = true`."); self.can_start_tick = true; } // Add subgraph to stratum queue if it is not already scheduled. if !succ_sg_data.is_scheduled.replace(true) { + tracing::trace!( + sg_id = succ_id.0, + sg_name = &*succ_sg_data.name, + "Successor subgraph scheduled." + ); self.stratum_queues[succ_sg_data.stratum].push_back(succ_id); } } @@ -369,6 +379,7 @@ impl<'a> Hydroflow<'a> { let sg_data = &self.subgraphs[sg_id.0]; tracing::trace!( sg_id = sg_id.0, + sg_name = &*sg_data.name, is_external = is_external, sg_stratum = sg_data.stratum, "Event received." diff --git a/hydroflow_lang/src/graph/hydroflow_graph.rs b/hydroflow_lang/src/graph/hydroflow_graph.rs index f1b046af69f2..b3f6ca00afc7 100644 --- a/hydroflow_lang/src/graph/hydroflow_graph.rs +++ b/hydroflow_lang/src/graph/hydroflow_graph.rs @@ -997,7 +997,7 @@ impl HydroflowGraph { } }; - let hoff_name = Literal::string(&*format!("Subgraph {:?}", subgraph_id)); + let sg_name = Literal::string(&*format!("Subgraph {:?}", subgraph_id)); let stratum = Literal::usize_unsuffixed( self.subgraph_stratum.get(subgraph_id).cloned().unwrap_or(0), ); @@ -1006,7 +1006,7 @@ impl HydroflowGraph { #( #op_prologue_code )* #hf.add_subgraph_stratified( - #hoff_name, + #sg_name, #stratum, var_expr!( #( #recv_ports ),* ), var_expr!( #( #send_ports ),* ),