diff --git a/hydroflow_plus/src/builder/built.rs b/hydroflow_plus/src/builder/built.rs index 4caf767f0a6d..440dabe41973 100644 --- a/hydroflow_plus/src/builder/built.rs +++ b/hydroflow_plus/src/builder/built.rs @@ -3,11 +3,11 @@ use std::marker::PhantomData; use hydroflow_lang::graph::{eliminate_extra_unions_tees, HydroflowGraph}; +use super::compiled::HfCompiled; use super::deploy::{DeployFlow, DeployResult}; use crate::deploy::{ClusterSpec, Deploy, ExternalSpec, LocalDeploy, ProcessSpec}; use crate::ir::HfPlusLeaf; use crate::location::{Cluster, ExternalProcess, Process}; -use crate::HfCompiled; pub struct BuiltFlow<'a> { pub(super) ir: Vec, diff --git a/hydroflow_plus/src/builder/compiled.rs b/hydroflow_plus/src/builder/compiled.rs new file mode 100644 index 000000000000..0d16884e0166 --- /dev/null +++ b/hydroflow_plus/src/builder/compiled.rs @@ -0,0 +1,123 @@ +use std::collections::BTreeMap; +use std::marker::PhantomData; + +use hydroflow::scheduled::graph::Hydroflow; +use hydroflow_lang::graph::{partition_graph, HydroflowGraph}; +use proc_macro2::TokenStream; +use quote::quote; +use stageleft::runtime_support::FreeVariable; +use stageleft::Quoted; + +pub struct HfCompiled<'a, ID> { + pub(super) hydroflow_ir: BTreeMap, + pub(super) extra_stmts: BTreeMap>, + pub(super) _phantom: PhantomData<&'a mut &'a ID>, +} + +impl HfCompiled<'_, ID> { + pub fn hydroflow_ir(&self) -> &BTreeMap { + &self.hydroflow_ir + } + + pub fn take_ir(self) -> BTreeMap { + self.hydroflow_ir + } +} + +impl<'a> HfCompiled<'a, usize> { + pub fn with_dynamic_id(self, id: impl Quoted<'a, usize>) -> HfBuiltWithId<'a> { + let hydroflow_crate = proc_macro_crate::crate_name("hydroflow_plus") + .expect("hydroflow_plus should be present in `Cargo.toml`"); + let root = match hydroflow_crate { + proc_macro_crate::FoundCrate::Itself => quote! { hydroflow_plus }, + proc_macro_crate::FoundCrate::Name(name) => { + let ident = syn::Ident::new(&name, proc_macro2::Span::call_site()); + quote! { #ident } + } + }; + + let mut conditioned_tokens = None; + for (subgraph_id, flat_graph) in self.hydroflow_ir { + let partitioned_graph = + partition_graph(flat_graph).expect("Failed to partition (cycle detected)."); + + let mut diagnostics = Vec::new(); + let tokens = partitioned_graph.as_code(&root, true, quote::quote!(), &mut diagnostics); + let my_extra_stmts = self + .extra_stmts + .get(&subgraph_id) + .cloned() + .unwrap_or_default(); + + if let Some(conditioned_tokens) = conditioned_tokens.as_mut() { + *conditioned_tokens = syn::parse_quote! { + #conditioned_tokens else if __given_id == #subgraph_id { + #(#my_extra_stmts)* + #tokens + } + }; + } else { + conditioned_tokens = Some(syn::parse_quote! { + if __given_id == #subgraph_id { + #(#my_extra_stmts)* + #tokens + } + }); + } + } + + let conditioned_tokens: TokenStream = conditioned_tokens.unwrap(); + let id = id.splice_untyped(); + HfBuiltWithId { + tokens: syn::parse_quote!({ + let __given_id = #id; + #conditioned_tokens else { + panic!("Invalid node id: {}", __given_id); + } + }), + _phantom: PhantomData, + } + } +} + +impl<'a> Quoted<'a, Hydroflow<'a>> for HfCompiled<'a, ()> {} + +impl<'a> FreeVariable> for HfCompiled<'a, ()> { + fn to_tokens(mut self) -> (Option, Option) { + let hydroflow_crate = proc_macro_crate::crate_name("hydroflow_plus") + .expect("hydroflow_plus should be present in `Cargo.toml`"); + let root = match hydroflow_crate { + proc_macro_crate::FoundCrate::Itself => quote! { hydroflow_plus }, + proc_macro_crate::FoundCrate::Name(name) => { + let ident = syn::Ident::new(&name, proc_macro2::Span::call_site()); + quote! { #ident } + } + }; + + if self.hydroflow_ir.len() != 1 { + panic!("Expected exactly one subgraph in the Hydroflow IR"); + } + + let flat_graph = self.hydroflow_ir.remove(&0).unwrap(); + let partitioned_graph = + partition_graph(flat_graph).expect("Failed to partition (cycle detected)."); + + let mut diagnostics = Vec::new(); + let tokens = partitioned_graph.as_code(&root, true, quote::quote!(), &mut diagnostics); + + (None, Some(tokens)) + } +} + +pub struct HfBuiltWithId<'a> { + tokens: TokenStream, + _phantom: PhantomData<&'a mut &'a ()>, +} + +impl<'a> Quoted<'a, Hydroflow<'a>> for HfBuiltWithId<'a> {} + +impl<'a> FreeVariable> for HfBuiltWithId<'a> { + fn to_tokens(self) -> (Option, Option) { + (None, Some(self.tokens)) + } +} diff --git a/hydroflow_plus/src/builder/deploy.rs b/hydroflow_plus/src/builder/deploy.rs index c345d7f7bf43..e441b76eaed6 100644 --- a/hydroflow_plus/src/builder/deploy.rs +++ b/hydroflow_plus/src/builder/deploy.rs @@ -11,13 +11,14 @@ use serde::Serialize; use stageleft::Quoted; use super::built::build_inner; +use super::compiled::HfCompiled; use crate::deploy::{ExternalSpec, LocalDeploy, Node, RegisterPort}; use crate::ir::HfPlusLeaf; use crate::location::external_process::{ ExternalBincodeSink, ExternalBincodeStream, ExternalBytesPort, }; use crate::location::{ExternalProcess, Location, LocationId}; -use crate::{Cluster, ClusterSpec, Deploy, HfCompiled, Process, ProcessSpec}; +use crate::{Cluster, ClusterSpec, Deploy, Process, ProcessSpec}; pub struct DeployFlow<'a, D: LocalDeploy<'a>> { pub(super) ir: Vec, diff --git a/hydroflow_plus/src/builder/mod.rs b/hydroflow_plus/src/builder/mod.rs index 33101b554e67..5b9fabc52428 100644 --- a/hydroflow_plus/src/builder/mod.rs +++ b/hydroflow_plus/src/builder/mod.rs @@ -10,6 +10,7 @@ use crate::location::{Cluster, ExternalProcess, Process}; use crate::RuntimeContext; pub mod built; +pub mod compiled; pub mod deploy; pub struct FlowStateInner { @@ -151,8 +152,6 @@ impl<'a> FlowBuilder<'a> { } pub fn runtime_context(&self) -> RuntimeContext<'a> { - RuntimeContext { - _phantom: PhantomData, - } + RuntimeContext::new() } } diff --git a/hydroflow_plus/src/lib.rs b/hydroflow_plus/src/lib.rs index 46539fa81d1a..bbc38afb76d8 100644 --- a/hydroflow_plus/src/lib.rs +++ b/hydroflow_plus/src/lib.rs @@ -2,22 +2,16 @@ stageleft::stageleft_no_entry_crate!(); -use std::collections::BTreeMap; -use std::marker::PhantomData; - -use hydroflow::scheduled::context::Context; pub use hydroflow::scheduled::graph::Hydroflow; pub use hydroflow::*; -use lang::graph::{partition_graph, HydroflowGraph}; -use proc_macro2::TokenStream; -use quote::quote; -use stageleft::runtime_support::FreeVariable; -use stageleft::Quoted; pub mod runtime_support { pub use bincode; } +pub mod runtime_context; +pub use runtime_context::RuntimeContext; + pub mod stream; pub use stream::{Bounded, Stream, Unbounded}; @@ -47,147 +41,6 @@ pub mod properties; mod staging_util; -#[derive(Clone)] -pub struct RuntimeContext<'a> { - _phantom: PhantomData<&'a mut &'a ()>, -} - -impl RuntimeContext<'_> { - pub fn new() -> Self { - Self { - _phantom: PhantomData, - } - } -} - -impl Copy for RuntimeContext<'_> {} - -impl Default for RuntimeContext<'_> { - fn default() -> Self { - Self::new() - } -} - -impl<'a> FreeVariable<&'a Context> for RuntimeContext<'a> { - fn to_tokens(self) -> (Option, Option) { - (None, Some(quote!(&context))) - } -} - -pub struct HfCompiled<'a, ID> { - hydroflow_ir: BTreeMap, - extra_stmts: BTreeMap>, - _phantom: PhantomData<&'a mut &'a ID>, -} - -impl HfCompiled<'_, ID> { - pub fn hydroflow_ir(&self) -> &BTreeMap { - &self.hydroflow_ir - } - - pub fn take_ir(self) -> BTreeMap { - self.hydroflow_ir - } -} - -impl<'a> HfCompiled<'a, usize> { - pub fn with_dynamic_id(self, id: impl Quoted<'a, usize>) -> HfBuiltWithId<'a> { - let hydroflow_crate = proc_macro_crate::crate_name("hydroflow_plus") - .expect("hydroflow_plus should be present in `Cargo.toml`"); - let root = match hydroflow_crate { - proc_macro_crate::FoundCrate::Itself => quote! { hydroflow_plus }, - proc_macro_crate::FoundCrate::Name(name) => { - let ident = syn::Ident::new(&name, proc_macro2::Span::call_site()); - quote! { #ident } - } - }; - - let mut conditioned_tokens = None; - for (subgraph_id, flat_graph) in self.hydroflow_ir { - let partitioned_graph = - partition_graph(flat_graph).expect("Failed to partition (cycle detected)."); - - let mut diagnostics = Vec::new(); - let tokens = partitioned_graph.as_code(&root, true, quote::quote!(), &mut diagnostics); - let my_extra_stmts = self - .extra_stmts - .get(&subgraph_id) - .cloned() - .unwrap_or_default(); - - if let Some(conditioned_tokens) = conditioned_tokens.as_mut() { - *conditioned_tokens = syn::parse_quote! { - #conditioned_tokens else if __given_id == #subgraph_id { - #(#my_extra_stmts)* - #tokens - } - }; - } else { - conditioned_tokens = Some(syn::parse_quote! { - if __given_id == #subgraph_id { - #(#my_extra_stmts)* - #tokens - } - }); - } - } - - let conditioned_tokens: TokenStream = conditioned_tokens.unwrap(); - let id = id.splice_untyped(); - HfBuiltWithId { - tokens: syn::parse_quote!({ - let __given_id = #id; - #conditioned_tokens else { - panic!("Invalid node id: {}", __given_id); - } - }), - _phantom: PhantomData, - } - } -} - -impl<'a> Quoted<'a, Hydroflow<'a>> for HfCompiled<'a, ()> {} - -impl<'a> FreeVariable> for HfCompiled<'a, ()> { - fn to_tokens(mut self) -> (Option, Option) { - let hydroflow_crate = proc_macro_crate::crate_name("hydroflow_plus") - .expect("hydroflow_plus should be present in `Cargo.toml`"); - let root = match hydroflow_crate { - proc_macro_crate::FoundCrate::Itself => quote! { hydroflow_plus }, - proc_macro_crate::FoundCrate::Name(name) => { - let ident = syn::Ident::new(&name, proc_macro2::Span::call_site()); - quote! { #ident } - } - }; - - if self.hydroflow_ir.len() != 1 { - panic!("Expected exactly one subgraph in the Hydroflow IR"); - } - - let flat_graph = self.hydroflow_ir.remove(&0).unwrap(); - let partitioned_graph = - partition_graph(flat_graph).expect("Failed to partition (cycle detected)."); - - let mut diagnostics = Vec::new(); - let tokens = partitioned_graph.as_code(&root, true, quote::quote!(), &mut diagnostics); - - (None, Some(tokens)) - } -} - -pub struct HfBuiltWithId<'a> { - tokens: TokenStream, - _phantom: PhantomData<&'a mut &'a ()>, -} - -impl<'a> Quoted<'a, Hydroflow<'a>> for HfBuiltWithId<'a> {} - -impl<'a> FreeVariable> for HfBuiltWithId<'a> { - fn to_tokens(self) -> (Option, Option) { - (None, Some(self.tokens)) - } -} - #[stageleft::runtime] #[cfg(test)] mod tests { diff --git a/hydroflow_plus/src/runtime_context.rs b/hydroflow_plus/src/runtime_context.rs new file mode 100644 index 000000000000..2732b87339a3 --- /dev/null +++ b/hydroflow_plus/src/runtime_context.rs @@ -0,0 +1,33 @@ +use std::marker::PhantomData; + +use hydroflow::scheduled::context::Context; +use proc_macro2::TokenStream; +use quote::quote; +use stageleft::runtime_support::FreeVariable; + +#[derive(Clone)] +pub struct RuntimeContext<'a> { + _phantom: PhantomData<&'a mut &'a ()>, +} + +impl RuntimeContext<'_> { + pub fn new() -> Self { + Self { + _phantom: PhantomData, + } + } +} + +impl Copy for RuntimeContext<'_> {} + +impl Default for RuntimeContext<'_> { + fn default() -> Self { + Self::new() + } +} + +impl<'a> FreeVariable<&'a Context> for RuntimeContext<'a> { + fn to_tokens(self) -> (Option, Option) { + (None, Some(quote!(&context))) + } +}