From 55d27e013b4a5064cabc36daaac338e7ef6f649a Mon Sep 17 00:00:00 2001 From: Mingwei Samuel Date: Thu, 16 Nov 2023 11:24:49 -0800 Subject: [PATCH 1/2] scheduler bug --- hydroflow/examples/schedule_bug/README.md | 6 ++++ hydroflow/examples/schedule_bug/main.rs | 32 +++++++++++++++++++++ hydroflow/src/scheduled/graph.rs | 13 ++++++++- hydroflow_lang/src/graph/hydroflow_graph.rs | 4 +-- 4 files changed, 52 insertions(+), 3 deletions(-) create mode 100644 hydroflow/examples/schedule_bug/README.md create mode 100644 hydroflow/examples/schedule_bug/main.rs diff --git a/hydroflow/examples/schedule_bug/README.md b/hydroflow/examples/schedule_bug/README.md new file mode 100644 index 000000000000..d9f1e786bad2 --- /dev/null +++ b/hydroflow/examples/schedule_bug/README.md @@ -0,0 +1,6 @@ +# Hello World + +To run: +``` +cargo run -p hydroflow --example hello_world +``` diff --git a/hydroflow/examples/schedule_bug/main.rs b/hydroflow/examples/schedule_bug/main.rs new file mode 100644 index 000000000000..3ea8a400be6e --- /dev/null +++ b/hydroflow/examples/schedule_bug/main.rs @@ -0,0 +1,32 @@ +use hydroflow::hydroflow_syntax; +use multiplatform_test::multiplatform_test; + +pub fn main() { + let mut df = hydroflow_syntax! { + // 3v1 + source_iter([1]) -> items; + items = union(); + + double = items + -> persist() + -> fold(|| 0, |accum, x| *accum += x) + -> defer_tick_lazy() + -> filter(|_| false) + -> tee(); + + double -> null(); + + double -> items; + }; + df.meta_graph() + .unwrap() + .open_mermaid(&Default::default()) + .unwrap(); + + df.run_available(); +} + +// #[multiplatform_test(test, env_tracing)] +// fn my_test() { +// main(); +// } diff --git a/hydroflow/src/scheduled/graph.rs b/hydroflow/src/scheduled/graph.rs index 5ca3d03ccb3e..34e62535faca 100644 --- a/hydroflow/src/scheduled/graph.rs +++ b/hydroflow/src/scheduled/graph.rs @@ -201,7 +201,11 @@ impl<'a> Hydroflow<'a> { let sg_data = &mut self.subgraphs[sg_id.0]; // This must be true for the subgraph to be enqueued. assert!(sg_data.is_scheduled.take()); - tracing::trace!(sg_id = sg_id.0, "Running subgraph."); + tracing::trace!( + sg_id = sg_id.0, + sg_name = &*sg_data.name, + "Running subgraph." + ); self.context.subgraph_id = sg_id; self.context.subgraph_last_tick_run_in = sg_data.last_tick_run_in; @@ -218,10 +222,16 @@ impl<'a> Hydroflow<'a> { let succ_sg_data = &self.subgraphs[succ_id.0]; // If we have sent data to the next tick, then we can start the next tick. if succ_sg_data.stratum < self.context.current_stratum && !sg_data.is_lazy { + tracing::trace!("Data sent to next tick, setting `can_start_tick = true`."); self.can_start_tick = true; } // Add subgraph to stratum queue if it is not already scheduled. if !succ_sg_data.is_scheduled.replace(true) { + tracing::trace!( + sg_id = succ_id.0, + sg_name = &*succ_sg_data.name, + "Successor subgraph scheduled." + ); self.stratum_queues[succ_sg_data.stratum].push_back(succ_id); } } @@ -369,6 +379,7 @@ impl<'a> Hydroflow<'a> { let sg_data = &self.subgraphs[sg_id.0]; tracing::trace!( sg_id = sg_id.0, + sg_name = &*sg_data.name, is_external = is_external, sg_stratum = sg_data.stratum, "Event received." diff --git a/hydroflow_lang/src/graph/hydroflow_graph.rs b/hydroflow_lang/src/graph/hydroflow_graph.rs index f1b046af69f2..b3f6ca00afc7 100644 --- a/hydroflow_lang/src/graph/hydroflow_graph.rs +++ b/hydroflow_lang/src/graph/hydroflow_graph.rs @@ -997,7 +997,7 @@ impl HydroflowGraph { } }; - let hoff_name = Literal::string(&*format!("Subgraph {:?}", subgraph_id)); + let sg_name = Literal::string(&*format!("Subgraph {:?}", subgraph_id)); let stratum = Literal::usize_unsuffixed( self.subgraph_stratum.get(subgraph_id).cloned().unwrap_or(0), ); @@ -1006,7 +1006,7 @@ impl HydroflowGraph { #( #op_prologue_code )* #hf.add_subgraph_stratified( - #hoff_name, + #sg_name, #stratum, var_expr!( #( #recv_ports ),* ), var_expr!( #( #send_ports ),* ), From 1224447b70052cd273ca8393610e3b09a299ad4d Mon Sep 17 00:00:00 2001 From: Mingwei Samuel Date: Mon, 20 Nov 2023 09:18:16 -0800 Subject: [PATCH 2/2] wip --- hydroflow/examples/schedule_bug/main.rs | 15 ++++++++--- hydroflow/examples/schedule_bug2/README.md | 6 +++++ hydroflow/examples/schedule_bug2/main.rs | 31 ++++++++++++++++++++++ 3 files changed, 49 insertions(+), 3 deletions(-) create mode 100644 hydroflow/examples/schedule_bug2/README.md create mode 100644 hydroflow/examples/schedule_bug2/main.rs diff --git a/hydroflow/examples/schedule_bug/main.rs b/hydroflow/examples/schedule_bug/main.rs index 3ea8a400be6e..e1dc3c79432b 100644 --- a/hydroflow/examples/schedule_bug/main.rs +++ b/hydroflow/examples/schedule_bug/main.rs @@ -2,15 +2,24 @@ use hydroflow::hydroflow_syntax; use multiplatform_test::multiplatform_test; pub fn main() { + { + let subscriber = tracing_subscriber::FmtSubscriber::builder() + .with_env_filter(tracing_subscriber::EnvFilter::from_default_env()) + .with_test_writer() + .finish(); + let _ = tracing::subscriber::set_global_default(subscriber); + } + let mut df = hydroflow_syntax! { // 3v1 source_iter([1]) -> items; items = union(); double = items - -> persist() - -> fold(|| 0, |accum, x| *accum += x) - -> defer_tick_lazy() + // -> persist() + -> fold::<'static>(|| 0, |accum, x| *accum += x) + // -> defer_tick_lazy() + -> inspect(|x| println!("{} {}: {}", context.current_tick(), context.current_stratum(), x)) -> filter(|_| false) -> tee(); diff --git a/hydroflow/examples/schedule_bug2/README.md b/hydroflow/examples/schedule_bug2/README.md new file mode 100644 index 000000000000..d9f1e786bad2 --- /dev/null +++ b/hydroflow/examples/schedule_bug2/README.md @@ -0,0 +1,6 @@ +# Hello World + +To run: +``` +cargo run -p hydroflow --example hello_world +``` diff --git a/hydroflow/examples/schedule_bug2/main.rs b/hydroflow/examples/schedule_bug2/main.rs new file mode 100644 index 000000000000..25689a8d457f --- /dev/null +++ b/hydroflow/examples/schedule_bug2/main.rs @@ -0,0 +1,31 @@ +use hydroflow::hydroflow_syntax; +use multiplatform_test::multiplatform_test; + +#[hydroflow::main] +pub async fn main() { + { + let subscriber = tracing_subscriber::FmtSubscriber::builder() + .with_env_filter(tracing_subscriber::EnvFilter::from_default_env()) + .with_test_writer() + .finish(); + let _ = tracing::subscriber::set_global_default(subscriber); + } + + let mut df = hydroflow_syntax! { + source_iter([1]) + -> persist() + -> inspect(|x| println!("{} {}: {}", context.current_tick(), context.current_stratum(), x)) + -> null(); + }; + df.meta_graph() + .unwrap() + .open_mermaid(&Default::default()) + .unwrap(); + + df.run_available(); +} + +// #[multiplatform_test(test, env_tracing)] +// fn my_test() { +// main(); +// }