diff --git a/hydroflow_lang/src/graph/flat_graph_builder.rs b/hydroflow_lang/src/graph/flat_graph_builder.rs index b5e0fcfdcc56..a28e50856bea 100644 --- a/hydroflow_lang/src/graph/flat_graph_builder.rs +++ b/hydroflow_lang/src/graph/flat_graph_builder.rs @@ -14,6 +14,7 @@ use syn::{Error, Ident, ItemUse}; use super::ops::defer_tick::DEFER_TICK; use super::ops::source_iter::SOURCE_ITER; use super::ops::source_stream::SOURCE_STREAM; +use super::ops::FloType; use super::{GraphEdgeId, GraphLoopId, GraphNode, GraphNodeId, HydroflowGraph, PortIndexValue}; use crate::diagnostic::{Diagnostic, Level}; use crate::graph::graph_algorithms; @@ -937,10 +938,9 @@ impl FlatGraphBuilder { let Some(_loop_id) = self.flat_graph.node_loop(node_id) else { continue; }; - // TODO(mingwei): don't hardcode source names, add prop to `op_constraints`. - if SOURCE_STREAM.name == op_inst.op_constraints.name - || SOURCE_ITER.name == op_inst.op_constraints.name - { + + // Source operators must be at the top level. + if Some(FloType::Source) == op_inst.op_constraints.flo_type { self.diagnostics.push(Diagnostic::spanned( node.span(), Level::Error, diff --git a/hydroflow_lang/src/graph/ops/_lattice_fold_batch.rs b/hydroflow_lang/src/graph/ops/_lattice_fold_batch.rs index d6f6f635fa21..4bdecb0c7858 100644 --- a/hydroflow_lang/src/graph/ops/_lattice_fold_batch.rs +++ b/hydroflow_lang/src/graph/ops/_lattice_fold_batch.rs @@ -42,6 +42,7 @@ pub const _LATTICE_FOLD_BATCH: OperatorConstraints = OperatorConstraints { num_args: 0, is_external_input: false, has_singleton_output: false, + flo_type: None, ports_inn: Some(|| PortListSpec::Fixed(parse_quote! { input, signal })), ports_out: None, input_delaytype_fn: |_| Some(DelayType::MonotoneAccum), diff --git a/hydroflow_lang/src/graph/ops/_lattice_join_fused_join.rs b/hydroflow_lang/src/graph/ops/_lattice_join_fused_join.rs index 7e945f0cc90c..f777acef489b 100644 --- a/hydroflow_lang/src/graph/ops/_lattice_join_fused_join.rs +++ b/hydroflow_lang/src/graph/ops/_lattice_join_fused_join.rs @@ -86,6 +86,7 @@ pub const _LATTICE_JOIN_FUSED_JOIN: OperatorConstraints = OperatorConstraints { type_args: &(2..=2), is_external_input: false, has_singleton_output: false, + flo_type: None, ports_inn: Some(|| super::PortListSpec::Fixed(parse_quote! { 0, 1 })), ports_out: None, input_delaytype_fn: |_| Some(DelayType::MonotoneAccum), diff --git a/hydroflow_lang/src/graph/ops/anti_join.rs b/hydroflow_lang/src/graph/ops/anti_join.rs index a81fcd48695a..c80f6384fd45 100644 --- a/hydroflow_lang/src/graph/ops/anti_join.rs +++ b/hydroflow_lang/src/graph/ops/anti_join.rs @@ -37,6 +37,7 @@ pub const ANTI_JOIN: OperatorConstraints = OperatorConstraints { // to prevent reading uncleared data if this subgraph doesn't run. // https://github.com/hydro-project/hydroflow/issues/1298 has_singleton_output: false, + flo_type: None, ports_inn: Some(|| super::PortListSpec::Fixed(parse_quote! { pos, neg })), ports_out: None, input_delaytype_fn: |idx| match idx { diff --git a/hydroflow_lang/src/graph/ops/anti_join_multiset.rs b/hydroflow_lang/src/graph/ops/anti_join_multiset.rs index cf9a1e9441d4..0c37c202d48a 100644 --- a/hydroflow_lang/src/graph/ops/anti_join_multiset.rs +++ b/hydroflow_lang/src/graph/ops/anti_join_multiset.rs @@ -37,6 +37,7 @@ pub const ANTI_JOIN_MULTISET: OperatorConstraints = OperatorConstraints { // to prevent reading uncleared data if this subgraph doesn't run. // https://github.com/hydro-project/hydroflow/issues/1298 has_singleton_output: false, + flo_type: None, ports_inn: Some(|| super::PortListSpec::Fixed(parse_quote! { pos, neg })), ports_out: None, input_delaytype_fn: |idx| match idx { diff --git a/hydroflow_lang/src/graph/ops/assert.rs b/hydroflow_lang/src/graph/ops/assert.rs index af3afeee9867..de859d5dddd7 100644 --- a/hydroflow_lang/src/graph/ops/assert.rs +++ b/hydroflow_lang/src/graph/ops/assert.rs @@ -26,6 +26,7 @@ pub const ASSERT: OperatorConstraints = OperatorConstraints { type_args: RANGE_0, is_external_input: false, has_singleton_output: false, + flo_type: None, ports_inn: None, ports_out: None, input_delaytype_fn: |_| None, diff --git a/hydroflow_lang/src/graph/ops/assert_eq.rs b/hydroflow_lang/src/graph/ops/assert_eq.rs index 0aca9dbd257f..2d08e58fcc70 100644 --- a/hydroflow_lang/src/graph/ops/assert_eq.rs +++ b/hydroflow_lang/src/graph/ops/assert_eq.rs @@ -37,6 +37,7 @@ pub const ASSERT_EQ: OperatorConstraints = OperatorConstraints { type_args: RANGE_0, is_external_input: false, has_singleton_output: false, + flo_type: None, ports_inn: None, ports_out: None, input_delaytype_fn: |_| None, diff --git a/hydroflow_lang/src/graph/ops/chain.rs b/hydroflow_lang/src/graph/ops/chain.rs index 96006fc396af..3ff9c90fdec4 100644 --- a/hydroflow_lang/src/graph/ops/chain.rs +++ b/hydroflow_lang/src/graph/ops/chain.rs @@ -30,6 +30,7 @@ pub const CHAIN: OperatorConstraints = OperatorConstraints { num_args: 0, is_external_input: false, has_singleton_output: false, + flo_type: None, ports_inn: None, ports_out: None, input_delaytype_fn: |idx| match idx { diff --git a/hydroflow_lang/src/graph/ops/cross_join.rs b/hydroflow_lang/src/graph/ops/cross_join.rs index fe4c2aeed80f..c22d0225bc7a 100644 --- a/hydroflow_lang/src/graph/ops/cross_join.rs +++ b/hydroflow_lang/src/graph/ops/cross_join.rs @@ -45,6 +45,7 @@ pub const CROSS_JOIN: OperatorConstraints = OperatorConstraints { type_args: &(0..=1), is_external_input: false, has_singleton_output: false, + flo_type: None, ports_inn: Some(|| super::PortListSpec::Fixed(parse_quote! { 0, 1 })), ports_out: None, input_delaytype_fn: |_| None, diff --git a/hydroflow_lang/src/graph/ops/cross_join_multiset.rs b/hydroflow_lang/src/graph/ops/cross_join_multiset.rs index e050f2818c3e..5708800bcb28 100644 --- a/hydroflow_lang/src/graph/ops/cross_join_multiset.rs +++ b/hydroflow_lang/src/graph/ops/cross_join_multiset.rs @@ -34,6 +34,7 @@ pub const CROSS_JOIN_MULTISET: OperatorConstraints = OperatorConstraints { type_args: &(0..=1), is_external_input: false, has_singleton_output: false, + flo_type: None, ports_inn: Some(|| super::PortListSpec::Fixed(parse_quote! { 0, 1 })), ports_out: None, input_delaytype_fn: |_| None, diff --git a/hydroflow_lang/src/graph/ops/cross_singleton.rs b/hydroflow_lang/src/graph/ops/cross_singleton.rs index ee9e2c8def78..6d4b3757e667 100644 --- a/hydroflow_lang/src/graph/ops/cross_singleton.rs +++ b/hydroflow_lang/src/graph/ops/cross_singleton.rs @@ -39,6 +39,7 @@ pub const CROSS_SINGLETON: OperatorConstraints = OperatorConstraints { num_args: 0, is_external_input: false, has_singleton_output: false, + flo_type: None, ports_inn: Some(|| super::PortListSpec::Fixed(parse_quote! { input, single })), ports_out: None, input_delaytype_fn: |idx| match idx { diff --git a/hydroflow_lang/src/graph/ops/defer_signal.rs b/hydroflow_lang/src/graph/ops/defer_signal.rs index b8e6459cb233..2adb2b4a993d 100644 --- a/hydroflow_lang/src/graph/ops/defer_signal.rs +++ b/hydroflow_lang/src/graph/ops/defer_signal.rs @@ -34,6 +34,7 @@ pub const DEFER_SIGNAL: OperatorConstraints = OperatorConstraints { num_args: 0, is_external_input: false, has_singleton_output: false, + flo_type: None, ports_inn: Some(|| super::PortListSpec::Fixed(parse_quote! { input, signal })), ports_out: None, input_delaytype_fn: |_| Some(DelayType::Stratum), diff --git a/hydroflow_lang/src/graph/ops/defer_tick.rs b/hydroflow_lang/src/graph/ops/defer_tick.rs index fa25f1ee1a84..42c3d92ba8d8 100644 --- a/hydroflow_lang/src/graph/ops/defer_tick.rs +++ b/hydroflow_lang/src/graph/ops/defer_tick.rs @@ -72,6 +72,7 @@ pub const DEFER_TICK: OperatorConstraints = OperatorConstraints { type_args: &(0..=1), is_external_input: false, has_singleton_output: false, + flo_type: None, ports_inn: None, ports_out: None, input_delaytype_fn: |_| Some(DelayType::Tick), diff --git a/hydroflow_lang/src/graph/ops/defer_tick_lazy.rs b/hydroflow_lang/src/graph/ops/defer_tick_lazy.rs index 8417d9ae9a92..96c505f19c7b 100644 --- a/hydroflow_lang/src/graph/ops/defer_tick_lazy.rs +++ b/hydroflow_lang/src/graph/ops/defer_tick_lazy.rs @@ -17,6 +17,7 @@ pub const DEFER_TICK_LAZY: OperatorConstraints = OperatorConstraints { type_args: RANGE_0, is_external_input: false, has_singleton_output: false, + flo_type: None, ports_inn: None, ports_out: None, input_delaytype_fn: |_| Some(DelayType::TickLazy), diff --git a/hydroflow_lang/src/graph/ops/demux.rs b/hydroflow_lang/src/graph/ops/demux.rs index b96fe44df4c5..bfedf7a9b8cc 100644 --- a/hydroflow_lang/src/graph/ops/demux.rs +++ b/hydroflow_lang/src/graph/ops/demux.rs @@ -53,6 +53,7 @@ pub const DEMUX: OperatorConstraints = OperatorConstraints { type_args: RANGE_0, is_external_input: false, has_singleton_output: false, + flo_type: None, ports_inn: None, ports_out: Some(|| PortListSpec::Variadic), input_delaytype_fn: |_| None, diff --git a/hydroflow_lang/src/graph/ops/demux_enum.rs b/hydroflow_lang/src/graph/ops/demux_enum.rs index 2d3ea6e9e501..8b612c243539 100644 --- a/hydroflow_lang/src/graph/ops/demux_enum.rs +++ b/hydroflow_lang/src/graph/ops/demux_enum.rs @@ -52,6 +52,7 @@ pub const DEMUX_ENUM: OperatorConstraints = OperatorConstraints { type_args: RANGE_1, is_external_input: false, has_singleton_output: false, + flo_type: None, ports_inn: None, ports_out: Some(|| PortListSpec::Variadic), input_delaytype_fn: |_| None, diff --git a/hydroflow_lang/src/graph/ops/dest_file.rs b/hydroflow_lang/src/graph/ops/dest_file.rs index f0190bfd07f5..0067cd53242d 100644 --- a/hydroflow_lang/src/graph/ops/dest_file.rs +++ b/hydroflow_lang/src/graph/ops/dest_file.rs @@ -32,6 +32,7 @@ pub const DEST_FILE: OperatorConstraints = OperatorConstraints { type_args: RANGE_0, is_external_input: false, has_singleton_output: false, + flo_type: None, ports_inn: None, ports_out: None, input_delaytype_fn: |_| None, diff --git a/hydroflow_lang/src/graph/ops/dest_sink.rs b/hydroflow_lang/src/graph/ops/dest_sink.rs index 7493acee85b9..ee2f5b38d610 100644 --- a/hydroflow_lang/src/graph/ops/dest_sink.rs +++ b/hydroflow_lang/src/graph/ops/dest_sink.rs @@ -93,6 +93,7 @@ pub const DEST_SINK: OperatorConstraints = OperatorConstraints { type_args: RANGE_0, is_external_input: false, has_singleton_output: false, + flo_type: None, ports_inn: None, ports_out: None, input_delaytype_fn: |_| None, diff --git a/hydroflow_lang/src/graph/ops/dest_sink_serde.rs b/hydroflow_lang/src/graph/ops/dest_sink_serde.rs index a517655ae88e..78b8a7ceb5fb 100644 --- a/hydroflow_lang/src/graph/ops/dest_sink_serde.rs +++ b/hydroflow_lang/src/graph/ops/dest_sink_serde.rs @@ -35,6 +35,7 @@ pub const DEST_SINK_SERDE: OperatorConstraints = OperatorConstraints { type_args: RANGE_0, is_external_input: false, has_singleton_output: false, + flo_type: None, ports_inn: None, ports_out: None, input_delaytype_fn: |_| None, diff --git a/hydroflow_lang/src/graph/ops/difference.rs b/hydroflow_lang/src/graph/ops/difference.rs index f626e86c5658..e2e8e83d2176 100644 --- a/hydroflow_lang/src/graph/ops/difference.rs +++ b/hydroflow_lang/src/graph/ops/difference.rs @@ -34,6 +34,7 @@ pub const DIFFERENCE: OperatorConstraints = OperatorConstraints { type_args: RANGE_0, is_external_input: false, has_singleton_output: false, + flo_type: None, ports_inn: Some(|| super::PortListSpec::Fixed(parse_quote! { pos, neg })), ports_out: None, input_delaytype_fn: |idx| match idx { diff --git a/hydroflow_lang/src/graph/ops/difference_multiset.rs b/hydroflow_lang/src/graph/ops/difference_multiset.rs index 8c87e9991846..603ffe05f3de 100644 --- a/hydroflow_lang/src/graph/ops/difference_multiset.rs +++ b/hydroflow_lang/src/graph/ops/difference_multiset.rs @@ -35,6 +35,7 @@ pub const DIFFERENCE_MULTISET: OperatorConstraints = OperatorConstraints { type_args: RANGE_0, is_external_input: false, has_singleton_output: false, + flo_type: None, ports_inn: Some(|| super::PortListSpec::Fixed(parse_quote! { pos, neg })), ports_out: None, input_delaytype_fn: |idx| match idx { diff --git a/hydroflow_lang/src/graph/ops/enumerate.rs b/hydroflow_lang/src/graph/ops/enumerate.rs index e4ede20d40e7..c371870afc69 100644 --- a/hydroflow_lang/src/graph/ops/enumerate.rs +++ b/hydroflow_lang/src/graph/ops/enumerate.rs @@ -32,6 +32,7 @@ pub const ENUMERATE: OperatorConstraints = OperatorConstraints { type_args: RANGE_0, is_external_input: false, has_singleton_output: false, + flo_type: None, ports_inn: None, ports_out: None, input_delaytype_fn: |_| None, diff --git a/hydroflow_lang/src/graph/ops/filter.rs b/hydroflow_lang/src/graph/ops/filter.rs index eea8150e1810..bb2906f93fe7 100644 --- a/hydroflow_lang/src/graph/ops/filter.rs +++ b/hydroflow_lang/src/graph/ops/filter.rs @@ -30,6 +30,7 @@ pub const FILTER: OperatorConstraints = OperatorConstraints { type_args: RANGE_0, is_external_input: false, has_singleton_output: false, + flo_type: None, ports_inn: None, ports_out: None, input_delaytype_fn: |_| None, diff --git a/hydroflow_lang/src/graph/ops/filter_map.rs b/hydroflow_lang/src/graph/ops/filter_map.rs index 0eb9a9d2bdfd..19f912849b1e 100644 --- a/hydroflow_lang/src/graph/ops/filter_map.rs +++ b/hydroflow_lang/src/graph/ops/filter_map.rs @@ -28,6 +28,7 @@ pub const FILTER_MAP: OperatorConstraints = OperatorConstraints { type_args: RANGE_0, is_external_input: false, has_singleton_output: false, + flo_type: None, ports_inn: None, ports_out: None, input_delaytype_fn: |_| None, diff --git a/hydroflow_lang/src/graph/ops/flat_map.rs b/hydroflow_lang/src/graph/ops/flat_map.rs index 94de32f24aaf..85aa66fb7ce5 100644 --- a/hydroflow_lang/src/graph/ops/flat_map.rs +++ b/hydroflow_lang/src/graph/ops/flat_map.rs @@ -32,6 +32,7 @@ pub const FLAT_MAP: OperatorConstraints = OperatorConstraints { type_args: RANGE_0, is_external_input: false, has_singleton_output: false, + flo_type: None, ports_inn: None, ports_out: None, input_delaytype_fn: |_| None, diff --git a/hydroflow_lang/src/graph/ops/flatten.rs b/hydroflow_lang/src/graph/ops/flatten.rs index 625a71d43195..533884da6570 100644 --- a/hydroflow_lang/src/graph/ops/flatten.rs +++ b/hydroflow_lang/src/graph/ops/flatten.rs @@ -27,6 +27,7 @@ pub const FLATTEN: OperatorConstraints = OperatorConstraints { type_args: RANGE_0, is_external_input: false, has_singleton_output: false, + flo_type: None, ports_inn: None, ports_out: None, input_delaytype_fn: |_| None, diff --git a/hydroflow_lang/src/graph/ops/fold.rs b/hydroflow_lang/src/graph/ops/fold.rs index 2cef16c6f2e8..7072b08c2a68 100644 --- a/hydroflow_lang/src/graph/ops/fold.rs +++ b/hydroflow_lang/src/graph/ops/fold.rs @@ -45,6 +45,7 @@ pub const FOLD: OperatorConstraints = OperatorConstraints { type_args: RANGE_0, is_external_input: false, has_singleton_output: true, + flo_type: None, ports_inn: None, ports_out: None, input_delaytype_fn: |_| Some(DelayType::Stratum), diff --git a/hydroflow_lang/src/graph/ops/fold_keyed.rs b/hydroflow_lang/src/graph/ops/fold_keyed.rs index c4572cae6b70..f91e3d2deadc 100644 --- a/hydroflow_lang/src/graph/ops/fold_keyed.rs +++ b/hydroflow_lang/src/graph/ops/fold_keyed.rs @@ -80,6 +80,7 @@ pub const FOLD_KEYED: OperatorConstraints = OperatorConstraints { // to prevent reading uncleared data if this subgraph doesn't run. // https://github.com/hydro-project/hydroflow/issues/1298 has_singleton_output: false, + flo_type: None, ports_inn: None, ports_out: None, input_delaytype_fn: |_| Some(DelayType::Stratum), diff --git a/hydroflow_lang/src/graph/ops/for_each.rs b/hydroflow_lang/src/graph/ops/for_each.rs index 601b079e874e..e087aa8eb149 100644 --- a/hydroflow_lang/src/graph/ops/for_each.rs +++ b/hydroflow_lang/src/graph/ops/for_each.rs @@ -30,6 +30,7 @@ pub const FOR_EACH: OperatorConstraints = OperatorConstraints { type_args: RANGE_0, is_external_input: false, has_singleton_output: false, + flo_type: None, ports_inn: None, ports_out: None, input_delaytype_fn: |_| None, diff --git a/hydroflow_lang/src/graph/ops/identity.rs b/hydroflow_lang/src/graph/ops/identity.rs index a55bab030c63..ffc1f44a4fcd 100644 --- a/hydroflow_lang/src/graph/ops/identity.rs +++ b/hydroflow_lang/src/graph/ops/identity.rs @@ -33,6 +33,7 @@ pub const IDENTITY: OperatorConstraints = OperatorConstraints { type_args: &(0..=1), is_external_input: false, has_singleton_output: false, + flo_type: None, ports_inn: None, ports_out: None, input_delaytype_fn: |_| None, diff --git a/hydroflow_lang/src/graph/ops/initialize.rs b/hydroflow_lang/src/graph/ops/initialize.rs index 877d655a121a..e7dbe0834658 100644 --- a/hydroflow_lang/src/graph/ops/initialize.rs +++ b/hydroflow_lang/src/graph/ops/initialize.rs @@ -26,6 +26,7 @@ pub const INITIALIZE: OperatorConstraints = OperatorConstraints { type_args: RANGE_0, is_external_input: false, has_singleton_output: false, + flo_type: None, ports_inn: None, ports_out: None, input_delaytype_fn: |_| None, diff --git a/hydroflow_lang/src/graph/ops/inspect.rs b/hydroflow_lang/src/graph/ops/inspect.rs index d99576a703c0..bdf888d92a2d 100644 --- a/hydroflow_lang/src/graph/ops/inspect.rs +++ b/hydroflow_lang/src/graph/ops/inspect.rs @@ -31,6 +31,7 @@ pub const INSPECT: OperatorConstraints = OperatorConstraints { type_args: RANGE_0, is_external_input: false, has_singleton_output: false, + flo_type: None, ports_inn: None, ports_out: None, input_delaytype_fn: |_| None, diff --git a/hydroflow_lang/src/graph/ops/join.rs b/hydroflow_lang/src/graph/ops/join.rs index 874d9ceaa118..764dc9b8ea34 100644 --- a/hydroflow_lang/src/graph/ops/join.rs +++ b/hydroflow_lang/src/graph/ops/join.rs @@ -91,6 +91,7 @@ pub const JOIN: OperatorConstraints = OperatorConstraints { type_args: &(0..=1), is_external_input: false, has_singleton_output: false, + flo_type: None, ports_inn: Some(|| super::PortListSpec::Fixed(parse_quote! { 0, 1 })), ports_out: None, input_delaytype_fn: |_| None, diff --git a/hydroflow_lang/src/graph/ops/join_fused.rs b/hydroflow_lang/src/graph/ops/join_fused.rs index e7166086c53e..867310d674a1 100644 --- a/hydroflow_lang/src/graph/ops/join_fused.rs +++ b/hydroflow_lang/src/graph/ops/join_fused.rs @@ -100,6 +100,7 @@ pub const JOIN_FUSED: OperatorConstraints = OperatorConstraints { type_args: RANGE_0, is_external_input: false, has_singleton_output: false, + flo_type: None, ports_inn: Some(|| super::PortListSpec::Fixed(parse_quote! { 0, 1 })), ports_out: None, input_delaytype_fn: |_| Some(DelayType::Stratum), diff --git a/hydroflow_lang/src/graph/ops/join_fused_lhs.rs b/hydroflow_lang/src/graph/ops/join_fused_lhs.rs index 9ecefbc730c3..198e0bdb0806 100644 --- a/hydroflow_lang/src/graph/ops/join_fused_lhs.rs +++ b/hydroflow_lang/src/graph/ops/join_fused_lhs.rs @@ -34,6 +34,7 @@ pub const JOIN_FUSED_LHS: OperatorConstraints = OperatorConstraints { type_args: RANGE_0, is_external_input: false, has_singleton_output: false, + flo_type: None, ports_inn: Some(|| super::PortListSpec::Fixed(parse_quote! { 0, 1 })), ports_out: None, input_delaytype_fn: |idx| match idx { diff --git a/hydroflow_lang/src/graph/ops/join_fused_rhs.rs b/hydroflow_lang/src/graph/ops/join_fused_rhs.rs index 04aae55a5063..e6f819b340d9 100644 --- a/hydroflow_lang/src/graph/ops/join_fused_rhs.rs +++ b/hydroflow_lang/src/graph/ops/join_fused_rhs.rs @@ -21,6 +21,7 @@ pub const JOIN_FUSED_RHS: OperatorConstraints = OperatorConstraints { type_args: RANGE_0, is_external_input: false, has_singleton_output: false, + flo_type: None, ports_inn: Some(|| super::PortListSpec::Fixed(parse_quote! { 0, 1 })), ports_out: None, input_delaytype_fn: |idx| match idx { diff --git a/hydroflow_lang/src/graph/ops/join_multiset.rs b/hydroflow_lang/src/graph/ops/join_multiset.rs index d5b446c71182..338dac329b60 100644 --- a/hydroflow_lang/src/graph/ops/join_multiset.rs +++ b/hydroflow_lang/src/graph/ops/join_multiset.rs @@ -37,6 +37,7 @@ pub const JOIN_MULTISET: OperatorConstraints = OperatorConstraints { type_args: RANGE_0, is_external_input: false, has_singleton_output: false, + flo_type: None, ports_inn: Some(|| super::PortListSpec::Fixed(parse_quote! { 0, 1 })), ports_out: None, input_delaytype_fn: |_| None, diff --git a/hydroflow_lang/src/graph/ops/lattice_bimorphism.rs b/hydroflow_lang/src/graph/ops/lattice_bimorphism.rs index a5ef7183ec7c..176258ae3331 100644 --- a/hydroflow_lang/src/graph/ops/lattice_bimorphism.rs +++ b/hydroflow_lang/src/graph/ops/lattice_bimorphism.rs @@ -52,6 +52,7 @@ pub const LATTICE_BIMORPHISM: OperatorConstraints = OperatorConstraints { type_args: RANGE_0, is_external_input: false, has_singleton_output: false, + flo_type: None, ports_inn: Some(|| super::PortListSpec::Fixed(parse_quote! { 0, 1 })), ports_out: None, input_delaytype_fn: |_| None, diff --git a/hydroflow_lang/src/graph/ops/lattice_fold.rs b/hydroflow_lang/src/graph/ops/lattice_fold.rs index 5475c7303138..8d85945451eb 100644 --- a/hydroflow_lang/src/graph/ops/lattice_fold.rs +++ b/hydroflow_lang/src/graph/ops/lattice_fold.rs @@ -39,6 +39,7 @@ pub const LATTICE_FOLD: OperatorConstraints = OperatorConstraints { type_args: RANGE_0, is_external_input: false, has_singleton_output: false, + flo_type: None, ports_inn: None, ports_out: None, input_delaytype_fn: |_| Some(DelayType::MonotoneAccum), diff --git a/hydroflow_lang/src/graph/ops/lattice_reduce.rs b/hydroflow_lang/src/graph/ops/lattice_reduce.rs index 2d5731769c98..a38a7dc2481f 100644 --- a/hydroflow_lang/src/graph/ops/lattice_reduce.rs +++ b/hydroflow_lang/src/graph/ops/lattice_reduce.rs @@ -40,6 +40,7 @@ pub const LATTICE_REDUCE: OperatorConstraints = OperatorConstraints { type_args: RANGE_0, is_external_input: false, has_singleton_output: false, + flo_type: None, ports_inn: None, ports_out: None, input_delaytype_fn: |_| Some(DelayType::MonotoneAccum), diff --git a/hydroflow_lang/src/graph/ops/map.rs b/hydroflow_lang/src/graph/ops/map.rs index ed49fe48bbe5..bd03ada64c5f 100644 --- a/hydroflow_lang/src/graph/ops/map.rs +++ b/hydroflow_lang/src/graph/ops/map.rs @@ -32,6 +32,7 @@ pub const MAP: OperatorConstraints = OperatorConstraints { type_args: RANGE_0, is_external_input: false, has_singleton_output: false, + flo_type: None, ports_inn: None, ports_out: None, input_delaytype_fn: |_| None, diff --git a/hydroflow_lang/src/graph/ops/mod.rs b/hydroflow_lang/src/graph/ops/mod.rs index 52ad4105e03c..17d44df8e70c 100644 --- a/hydroflow_lang/src/graph/ops/mod.rs +++ b/hydroflow_lang/src/graph/ops/mod.rs @@ -70,6 +70,8 @@ pub struct OperatorConstraints { /// If true, [`WriteContextArgs::singleton_output_ident`] will be set to a meaningful value in /// the [`Self::write_fn`] invocation. pub has_singleton_output: bool, + /// Flo semantics type. + pub flo_type: Option, /// What named or numbered input ports to expect? pub ports_inn: Option PortListSpec>, @@ -541,3 +543,19 @@ impl OperatorCategory { } } } + +/// Operator type for Flo semantics. +#[derive(Clone, Copy, PartialOrd, Ord, PartialEq, Eq, Debug)] +pub enum FloType { +<<<<<<< HEAD + Source, + Windowing, +======= + /// A source operator, which must be at the top level. + Source, + /// A windowing operator, for moving data into a loop context. + Windowing, + /// An un-windowing operator, for moving data out of a loop context. +>>>>>>> a44ec015fac (feat(hydroflow_lang): Add `FloType`s to operators, for source check) + Unwindowing, +} diff --git a/hydroflow_lang/src/graph/ops/multiset_delta.rs b/hydroflow_lang/src/graph/ops/multiset_delta.rs index dd47834cf2b9..46e34da6f33c 100644 --- a/hydroflow_lang/src/graph/ops/multiset_delta.rs +++ b/hydroflow_lang/src/graph/ops/multiset_delta.rs @@ -46,6 +46,7 @@ pub const MULTISET_DELTA: OperatorConstraints = OperatorConstraints { // https://github.com/hydro-project/hydroflow/issues/1298 // If `'tick` lifetimes are added. has_singleton_output: false, + flo_type: None, ports_inn: None, ports_out: None, input_delaytype_fn: |_| None, diff --git a/hydroflow_lang/src/graph/ops/next_stratum.rs b/hydroflow_lang/src/graph/ops/next_stratum.rs index 0a6238f5544e..318a3bcd5e6b 100644 --- a/hydroflow_lang/src/graph/ops/next_stratum.rs +++ b/hydroflow_lang/src/graph/ops/next_stratum.rs @@ -20,6 +20,7 @@ pub const NEXT_STRATUM: OperatorConstraints = OperatorConstraints { type_args: RANGE_0, is_external_input: false, has_singleton_output: false, + flo_type: None, ports_inn: None, ports_out: None, input_delaytype_fn: |_| Some(DelayType::Stratum), diff --git a/hydroflow_lang/src/graph/ops/null.rs b/hydroflow_lang/src/graph/ops/null.rs index 73ae3f48e3d8..45f574e2c3d3 100644 --- a/hydroflow_lang/src/graph/ops/null.rs +++ b/hydroflow_lang/src/graph/ops/null.rs @@ -26,6 +26,7 @@ pub const NULL: OperatorConstraints = OperatorConstraints { type_args: &(0..=1), is_external_input: false, has_singleton_output: false, + flo_type: None, ports_inn: None, ports_out: None, input_delaytype_fn: |_| None, diff --git a/hydroflow_lang/src/graph/ops/partition.rs b/hydroflow_lang/src/graph/ops/partition.rs index 1f1fddb83512..d232ffd12e9a 100644 --- a/hydroflow_lang/src/graph/ops/partition.rs +++ b/hydroflow_lang/src/graph/ops/partition.rs @@ -65,6 +65,7 @@ pub const PARTITION: OperatorConstraints = OperatorConstraints { type_args: RANGE_0, is_external_input: false, has_singleton_output: false, + flo_type: None, ports_inn: None, ports_out: Some(|| PortListSpec::Variadic), input_delaytype_fn: |_| None, diff --git a/hydroflow_lang/src/graph/ops/persist.rs b/hydroflow_lang/src/graph/ops/persist.rs index 187f571e5693..4f3f41d5decf 100644 --- a/hydroflow_lang/src/graph/ops/persist.rs +++ b/hydroflow_lang/src/graph/ops/persist.rs @@ -48,6 +48,7 @@ pub const PERSIST: OperatorConstraints = OperatorConstraints { type_args: RANGE_0, is_external_input: false, has_singleton_output: true, + flo_type: None, ports_inn: None, ports_out: None, input_delaytype_fn: |_| None, diff --git a/hydroflow_lang/src/graph/ops/persist_mut.rs b/hydroflow_lang/src/graph/ops/persist_mut.rs index 08c0b5f627d9..822d98b30e9e 100644 --- a/hydroflow_lang/src/graph/ops/persist_mut.rs +++ b/hydroflow_lang/src/graph/ops/persist_mut.rs @@ -39,6 +39,7 @@ pub const PERSIST_MUT: OperatorConstraints = OperatorConstraints { // https://github.com/hydro-project/hydroflow/issues/1298 // If `'tick` lifetimes are added. has_singleton_output: false, + flo_type: None, ports_inn: None, ports_out: None, input_delaytype_fn: |_| Some(DelayType::Stratum), diff --git a/hydroflow_lang/src/graph/ops/persist_mut_keyed.rs b/hydroflow_lang/src/graph/ops/persist_mut_keyed.rs index a0cf662ed8a7..6e97e74b95cb 100644 --- a/hydroflow_lang/src/graph/ops/persist_mut_keyed.rs +++ b/hydroflow_lang/src/graph/ops/persist_mut_keyed.rs @@ -39,6 +39,7 @@ pub const PERSIST_MUT_KEYED: OperatorConstraints = OperatorConstraints { // https://github.com/hydro-project/hydroflow/issues/1298 // If `'tick` lifetimes are added. has_singleton_output: false, + flo_type: None, ports_inn: None, ports_out: None, input_delaytype_fn: |_| Some(DelayType::Stratum), diff --git a/hydroflow_lang/src/graph/ops/py_udf.rs b/hydroflow_lang/src/graph/ops/py_udf.rs index 7b2bc0adb8f8..67901a58d0c7 100644 --- a/hydroflow_lang/src/graph/ops/py_udf.rs +++ b/hydroflow_lang/src/graph/ops/py_udf.rs @@ -62,6 +62,7 @@ pub const PY_UDF: OperatorConstraints = OperatorConstraints { type_args: RANGE_0, is_external_input: false, has_singleton_output: false, + flo_type: None, ports_inn: None, ports_out: None, input_delaytype_fn: |_| None, diff --git a/hydroflow_lang/src/graph/ops/reduce.rs b/hydroflow_lang/src/graph/ops/reduce.rs index 53530b2d49b4..03b16c2e4c09 100644 --- a/hydroflow_lang/src/graph/ops/reduce.rs +++ b/hydroflow_lang/src/graph/ops/reduce.rs @@ -43,6 +43,7 @@ pub const REDUCE: OperatorConstraints = OperatorConstraints { type_args: RANGE_0, is_external_input: false, has_singleton_output: true, + flo_type: None, ports_inn: None, ports_out: None, input_delaytype_fn: |_| Some(DelayType::Stratum), diff --git a/hydroflow_lang/src/graph/ops/reduce_keyed.rs b/hydroflow_lang/src/graph/ops/reduce_keyed.rs index 611bad6b1c6d..bf1fe2f8b4c7 100644 --- a/hydroflow_lang/src/graph/ops/reduce_keyed.rs +++ b/hydroflow_lang/src/graph/ops/reduce_keyed.rs @@ -70,6 +70,7 @@ pub const REDUCE_KEYED: OperatorConstraints = OperatorConstraints { // to prevent reading uncleared data if this subgraph doesn't run. // https://github.com/hydro-project/hydroflow/issues/1298 has_singleton_output: false, + flo_type: None, ports_inn: None, ports_out: None, input_delaytype_fn: |_| Some(DelayType::Stratum), diff --git a/hydroflow_lang/src/graph/ops/sort.rs b/hydroflow_lang/src/graph/ops/sort.rs index a516e4e6368f..e10349b180a9 100644 --- a/hydroflow_lang/src/graph/ops/sort.rs +++ b/hydroflow_lang/src/graph/ops/sort.rs @@ -27,6 +27,7 @@ pub const SORT: OperatorConstraints = OperatorConstraints { type_args: RANGE_0, is_external_input: false, has_singleton_output: false, + flo_type: None, ports_inn: None, ports_out: None, input_delaytype_fn: |_| Some(DelayType::Stratum), diff --git a/hydroflow_lang/src/graph/ops/sort_by_key.rs b/hydroflow_lang/src/graph/ops/sort_by_key.rs index 967da1027f73..1e7aacb603c2 100644 --- a/hydroflow_lang/src/graph/ops/sort_by_key.rs +++ b/hydroflow_lang/src/graph/ops/sort_by_key.rs @@ -27,6 +27,7 @@ pub const SORT_BY_KEY: OperatorConstraints = OperatorConstraints { type_args: RANGE_0, is_external_input: false, has_singleton_output: false, + flo_type: None, ports_inn: None, ports_out: None, input_delaytype_fn: |_| Some(DelayType::Stratum), diff --git a/hydroflow_lang/src/graph/ops/source_file.rs b/hydroflow_lang/src/graph/ops/source_file.rs index 604da35ffbfb..664e9ec89640 100644 --- a/hydroflow_lang/src/graph/ops/source_file.rs +++ b/hydroflow_lang/src/graph/ops/source_file.rs @@ -2,8 +2,8 @@ use quote::quote_spanned; use syn::parse_quote_spanned; use super::{ - make_missing_runtime_msg, OperatorCategory, OperatorConstraints, - OperatorWriteOutput, WriteContextArgs, RANGE_0, RANGE_1, + make_missing_runtime_msg, FloType, OperatorCategory, OperatorConstraints, OperatorWriteOutput, + WriteContextArgs, RANGE_0, RANGE_1, }; /// > 0 input streams, 1 output stream @@ -30,6 +30,7 @@ pub const SOURCE_FILE: OperatorConstraints = OperatorConstraints { type_args: &(0..=1), is_external_input: true, has_singleton_output: false, + flo_type: Some(FloType::Source), ports_inn: None, ports_out: None, input_delaytype_fn: |_| None, diff --git a/hydroflow_lang/src/graph/ops/source_interval.rs b/hydroflow_lang/src/graph/ops/source_interval.rs index e1759023c7df..4f77fd526958 100644 --- a/hydroflow_lang/src/graph/ops/source_interval.rs +++ b/hydroflow_lang/src/graph/ops/source_interval.rs @@ -2,8 +2,8 @@ use quote::quote_spanned; use syn::parse_quote_spanned; use super::{ - OperatorCategory, OperatorConstraints, OperatorWriteOutput, WriteContextArgs, - RANGE_0, RANGE_1, + FloType, OperatorCategory, OperatorConstraints, OperatorWriteOutput, WriteContextArgs, RANGE_0, + RANGE_1, }; /// > 0 input streams, 1 output stream @@ -54,6 +54,7 @@ pub const SOURCE_INTERVAL: OperatorConstraints = OperatorConstraints { type_args: RANGE_0, is_external_input: true, has_singleton_output: false, + flo_type: Some(FloType::Source), ports_inn: None, ports_out: None, input_delaytype_fn: |_| None, diff --git a/hydroflow_lang/src/graph/ops/source_iter.rs b/hydroflow_lang/src/graph/ops/source_iter.rs index 74dce7bf2d50..ca5a299b9c32 100644 --- a/hydroflow_lang/src/graph/ops/source_iter.rs +++ b/hydroflow_lang/src/graph/ops/source_iter.rs @@ -1,8 +1,7 @@ use quote::quote_spanned; use super::{ - OperatorCategory, OperatorConstraints, OperatorWriteOutput, WriteContextArgs, - RANGE_0, RANGE_1, + FloType, OperatorCategory, OperatorConstraints, OperatorWriteOutput, WriteContextArgs, RANGE_0, RANGE_1 }; /// > 0 input streams, 1 output stream @@ -30,6 +29,7 @@ pub const SOURCE_ITER: OperatorConstraints = OperatorConstraints { type_args: RANGE_0, is_external_input: false, has_singleton_output: false, + flo_type: Some(FloType::Source), ports_inn: None, ports_out: None, input_delaytype_fn: |_| None, diff --git a/hydroflow_lang/src/graph/ops/source_json.rs b/hydroflow_lang/src/graph/ops/source_json.rs index 40cc97cccc60..890a25372aac 100644 --- a/hydroflow_lang/src/graph/ops/source_json.rs +++ b/hydroflow_lang/src/graph/ops/source_json.rs @@ -1,7 +1,7 @@ use quote::quote_spanned; use super::{ - OpInstGenerics, OperatorCategory, OperatorConstraints, OperatorInstance, + FloType, OpInstGenerics, OperatorCategory, OperatorConstraints, OperatorInstance, OperatorWriteOutput, WriteContextArgs, RANGE_0, RANGE_1, }; @@ -27,6 +27,7 @@ pub const SOURCE_JSON: OperatorConstraints = OperatorConstraints { type_args: &(0..=1), is_external_input: true, has_singleton_output: false, + flo_type: Some(FloType::Source), ports_inn: None, ports_out: None, input_delaytype_fn: |_| None, diff --git a/hydroflow_lang/src/graph/ops/source_stdin.rs b/hydroflow_lang/src/graph/ops/source_stdin.rs index af5d8a17cc67..4a788eecc9e6 100644 --- a/hydroflow_lang/src/graph/ops/source_stdin.rs +++ b/hydroflow_lang/src/graph/ops/source_stdin.rs @@ -1,8 +1,8 @@ use quote::quote_spanned; use super::{ - OperatorCategory, OperatorConstraints, OperatorWriteOutput, WriteContextArgs, - RANGE_0, RANGE_1, + FloType, OperatorCategory, OperatorConstraints, OperatorWriteOutput, WriteContextArgs, RANGE_0, + RANGE_1, }; /// > 0 input streams, 1 output stream @@ -29,6 +29,7 @@ pub const SOURCE_STDIN: OperatorConstraints = OperatorConstraints { type_args: RANGE_0, is_external_input: true, has_singleton_output: false, + flo_type: Some(FloType::Source), ports_inn: None, ports_out: None, input_delaytype_fn: |_| None, diff --git a/hydroflow_lang/src/graph/ops/source_stream.rs b/hydroflow_lang/src/graph/ops/source_stream.rs index f5610dfbeb32..8a5b7f392e3a 100644 --- a/hydroflow_lang/src/graph/ops/source_stream.rs +++ b/hydroflow_lang/src/graph/ops/source_stream.rs @@ -1,8 +1,8 @@ use quote::quote_spanned; use super::{ - OperatorCategory, OperatorConstraints, OperatorWriteOutput, WriteContextArgs, - RANGE_0, RANGE_1, + FloType, OperatorCategory, OperatorConstraints, OperatorWriteOutput, WriteContextArgs, RANGE_0, + RANGE_1, }; /// > 0 input streams, 1 output stream @@ -36,6 +36,7 @@ pub const SOURCE_STREAM: OperatorConstraints = OperatorConstraints { type_args: RANGE_0, is_external_input: true, has_singleton_output: false, + flo_type: Some(FloType::Source), ports_inn: None, ports_out: None, input_delaytype_fn: |_| None, diff --git a/hydroflow_lang/src/graph/ops/source_stream_serde.rs b/hydroflow_lang/src/graph/ops/source_stream_serde.rs index cb0d7003522f..4c4fbf51d6f8 100644 --- a/hydroflow_lang/src/graph/ops/source_stream_serde.rs +++ b/hydroflow_lang/src/graph/ops/source_stream_serde.rs @@ -1,8 +1,8 @@ use quote::quote_spanned; use super::{ - OperatorCategory, OperatorConstraints, OperatorWriteOutput, WriteContextArgs, - RANGE_0, RANGE_1, + FloType, OperatorCategory, OperatorConstraints, OperatorWriteOutput, WriteContextArgs, RANGE_0, + RANGE_1, }; /// > 0 input streams, 1 output stream @@ -36,6 +36,7 @@ pub const SOURCE_STREAM_SERDE: OperatorConstraints = OperatorConstraints { type_args: RANGE_0, is_external_input: true, has_singleton_output: false, + flo_type: Some(FloType::Source), ports_inn: None, ports_out: None, input_delaytype_fn: |_| None, diff --git a/hydroflow_lang/src/graph/ops/spin.rs b/hydroflow_lang/src/graph/ops/spin.rs index b943cb09ac9e..c686d03d0c95 100644 --- a/hydroflow_lang/src/graph/ops/spin.rs +++ b/hydroflow_lang/src/graph/ops/spin.rs @@ -29,6 +29,7 @@ pub const SPIN: OperatorConstraints = OperatorConstraints { type_args: RANGE_0, is_external_input: false, has_singleton_output: false, + flo_type: None, ports_inn: None, ports_out: None, input_delaytype_fn: |_| None, diff --git a/hydroflow_lang/src/graph/ops/state.rs b/hydroflow_lang/src/graph/ops/state.rs index b2c830916afe..ddcf213af446 100644 --- a/hydroflow_lang/src/graph/ops/state.rs +++ b/hydroflow_lang/src/graph/ops/state.rs @@ -31,6 +31,7 @@ pub const STATE: OperatorConstraints = OperatorConstraints { type_args: &(0..=1), is_external_input: false, has_singleton_output: true, + flo_type: None, ports_inn: None, ports_out: None, input_delaytype_fn: |_| None, diff --git a/hydroflow_lang/src/graph/ops/state_by.rs b/hydroflow_lang/src/graph/ops/state_by.rs index f067059ab514..d676c5fc8439 100644 --- a/hydroflow_lang/src/graph/ops/state_by.rs +++ b/hydroflow_lang/src/graph/ops/state_by.rs @@ -32,6 +32,7 @@ pub const STATE_BY: OperatorConstraints = OperatorConstraints { type_args: &(0..=1), is_external_input: false, has_singleton_output: true, + flo_type: None, ports_inn: None, ports_out: None, input_delaytype_fn: |_| None, diff --git a/hydroflow_lang/src/graph/ops/tee.rs b/hydroflow_lang/src/graph/ops/tee.rs index fcbf0a182d6d..01641044f328 100644 --- a/hydroflow_lang/src/graph/ops/tee.rs +++ b/hydroflow_lang/src/graph/ops/tee.rs @@ -28,6 +28,7 @@ pub const TEE: OperatorConstraints = OperatorConstraints { type_args: RANGE_0, is_external_input: false, has_singleton_output: false, + flo_type: None, ports_inn: None, ports_out: None, input_delaytype_fn: |_| None, diff --git a/hydroflow_lang/src/graph/ops/union.rs b/hydroflow_lang/src/graph/ops/union.rs index dbc8e6af4077..ae59c8053164 100644 --- a/hydroflow_lang/src/graph/ops/union.rs +++ b/hydroflow_lang/src/graph/ops/union.rs @@ -32,6 +32,7 @@ pub const UNION: OperatorConstraints = OperatorConstraints { type_args: RANGE_0, is_external_input: false, has_singleton_output: false, + flo_type: None, ports_inn: None, ports_out: None, input_delaytype_fn: |_| None, diff --git a/hydroflow_lang/src/graph/ops/unique.rs b/hydroflow_lang/src/graph/ops/unique.rs index 8dbc63d2a111..28b44a049e4f 100644 --- a/hydroflow_lang/src/graph/ops/unique.rs +++ b/hydroflow_lang/src/graph/ops/unique.rs @@ -54,6 +54,7 @@ pub const UNIQUE: OperatorConstraints = OperatorConstraints { type_args: RANGE_0, is_external_input: false, has_singleton_output: false, + flo_type: None, ports_inn: None, ports_out: None, input_delaytype_fn: |_| None, diff --git a/hydroflow_lang/src/graph/ops/unzip.rs b/hydroflow_lang/src/graph/ops/unzip.rs index 272df573983d..ac3f04d21461 100644 --- a/hydroflow_lang/src/graph/ops/unzip.rs +++ b/hydroflow_lang/src/graph/ops/unzip.rs @@ -28,6 +28,7 @@ pub const UNZIP: OperatorConstraints = OperatorConstraints { type_args: RANGE_0, is_external_input: false, has_singleton_output: false, + flo_type: None, ports_inn: None, ports_out: Some(|| super::PortListSpec::Fixed(parse_quote!(0, 1))), input_delaytype_fn: |_| None, diff --git a/hydroflow_lang/src/graph/ops/zip.rs b/hydroflow_lang/src/graph/ops/zip.rs index 1cb58f85316d..290d642cc098 100644 --- a/hydroflow_lang/src/graph/ops/zip.rs +++ b/hydroflow_lang/src/graph/ops/zip.rs @@ -30,6 +30,7 @@ pub const ZIP: OperatorConstraints = OperatorConstraints { type_args: RANGE_0, is_external_input: false, has_singleton_output: false, + flo_type: None, ports_inn: Some(|| super::PortListSpec::Fixed(parse_quote! { 0, 1 })), ports_out: None, input_delaytype_fn: |_| None, diff --git a/hydroflow_lang/src/graph/ops/zip_longest.rs b/hydroflow_lang/src/graph/ops/zip_longest.rs index db07f791bd55..1ed5e5c51af6 100644 --- a/hydroflow_lang/src/graph/ops/zip_longest.rs +++ b/hydroflow_lang/src/graph/ops/zip_longest.rs @@ -34,6 +34,7 @@ pub const ZIP_LONGEST: OperatorConstraints = OperatorConstraints { type_args: RANGE_0, is_external_input: false, has_singleton_output: false, + flo_type: None, ports_inn: Some(|| super::PortListSpec::Fixed(parse_quote! { 0, 1 })), ports_out: None, input_delaytype_fn: |_| Some(DelayType::Stratum),