From fb6444b1dd6125b11bea5a09e5f7e6937276f0c8 Mon Sep 17 00:00:00 2001 From: Shadaj Laddad Date: Fri, 6 Oct 2023 13:01:23 -0700 Subject: [PATCH] feat(hydroflow): prototype a functional surface syntax using staging --- .vscode/settings.json | 5 +- Cargo.lock | 103 ++++- Cargo.toml | 8 + hydroflow/examples/rga/adjacency.rs | 2 +- hydroflow/examples/rga/datalog.rs | 2 +- hydroflow/examples/rga/datalog_agg.rs | 2 +- hydroflow/examples/rga/minimal.rs | 2 +- hydroflow/examples/shopping/flows/bp_flow.rs | 2 +- .../shopping/flows/client_state_flow.rs | 2 +- .../examples/shopping/flows/listener_flow.rs | 2 +- .../examples/shopping/flows/orig_flow.rs | 2 +- .../shopping/flows/push_group_flow.rs | 2 +- .../shopping/flows/rep_server_flow.rs | 2 +- .../shopping/flows/server_state_flow.rs | 2 +- .../examples/shopping/flows/ssiv_flow.rs | 2 +- hydroflow/src/scheduled/graph.rs | 22 +- hydroflow/src/scheduled/graph_ext.rs | 2 +- hydroflow/src/scheduled/net/mod.rs | 2 +- hydroflow/src/scheduled/net/network_vertex.rs | 2 +- hydroflow/src/scheduled/query.rs | 26 +- hydroflow/src/util/cli.rs | 2 +- hydroflow_plus/Cargo.toml | 24 ++ hydroflow_plus/src/lib.rs | 324 ++++++++++++++ hydroflow_plus_kvs/Cargo.toml | 18 + hydroflow_plus_kvs/build.rs | 5 + hydroflow_plus_kvs/src/lib.rs | 9 + hydroflow_plus_kvs_flow/Cargo.toml | 14 + hydroflow_plus_kvs_flow/src/lib.rs | 51 +++ hydroflow_plus_kvs_macro/Cargo.toml | 19 + hydroflow_plus_kvs_macro/build.rs | 8 + hydroflow_plus_kvs_macro/src/lib.rs | 5 + hydroflow_plus_kvs_server/Cargo.toml | 11 + hydroflow_plus_kvs_server/src/main.rs | 8 + stagefright/Cargo.toml | 18 + stagefright/src/lib.rs | 155 +++++++ stagefright/src/runtime_support.rs | 163 ++++++++ stagefright_macro/Cargo.toml | 19 + stagefright_macro/src/free_variable/mod.rs | 231 ++++++++++ .../src/free_variable/prelude.rs | 72 ++++ stagefright_macro/src/lib.rs | 394 ++++++++++++++++++ stagefright_tool/Cargo.toml | 19 + stagefright_tool/src/lib.rs | 219 ++++++++++ website_playground/src/lib.rs | 6 +- 43 files changed, 1943 insertions(+), 45 deletions(-) create mode 100644 hydroflow_plus/Cargo.toml create mode 100644 hydroflow_plus/src/lib.rs create mode 100644 hydroflow_plus_kvs/Cargo.toml create mode 100644 hydroflow_plus_kvs/build.rs create mode 100644 hydroflow_plus_kvs/src/lib.rs create mode 100644 hydroflow_plus_kvs_flow/Cargo.toml create mode 100644 hydroflow_plus_kvs_flow/src/lib.rs create mode 100644 hydroflow_plus_kvs_macro/Cargo.toml create mode 100644 hydroflow_plus_kvs_macro/build.rs create mode 100644 hydroflow_plus_kvs_macro/src/lib.rs create mode 100644 hydroflow_plus_kvs_server/Cargo.toml create mode 100644 hydroflow_plus_kvs_server/src/main.rs create mode 100644 stagefright/Cargo.toml create mode 100644 stagefright/src/lib.rs create mode 100644 stagefright/src/runtime_support.rs create mode 100644 stagefright_macro/Cargo.toml create mode 100644 stagefright_macro/src/free_variable/mod.rs create mode 100644 stagefright_macro/src/free_variable/prelude.rs create mode 100644 stagefright_macro/src/lib.rs create mode 100644 stagefright_tool/Cargo.toml create mode 100644 stagefright_tool/src/lib.rs diff --git a/.vscode/settings.json b/.vscode/settings.json index 09bd82cadee..d462ae19568 100644 --- a/.vscode/settings.json +++ b/.vscode/settings.json @@ -8,5 +8,8 @@ "INSTA_FORCE_PASS": "1" } } - ] + ], + "files.watcherExclude": { + "**/target": true + } } diff --git a/Cargo.lock b/Cargo.lock index 650277cea8e..48283a7fb55 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1461,6 +1461,63 @@ dependencies = [ "syn 2.0.14", ] +[[package]] +name = "hydroflow_plus" +version = "0.4.0" +dependencies = [ + "hydroflow", + "hydroflow_lang", + "proc-macro-crate", + "proc-macro2", + "quote", + "stagefright", + "syn 2.0.14", +] + +[[package]] +name = "hydroflow_plus_kvs" +version = "0.0.0" +dependencies = [ + "hydroflow_plus", + "hydroflow_plus_kvs_macro", + "regex", + "serde", + "stagefright", + "stagefright_tool", +] + +[[package]] +name = "hydroflow_plus_kvs_flow" +version = "0.0.0" +dependencies = [ + "hydroflow_plus", + "regex", + "serde", + "stagefright", +] + +[[package]] +name = "hydroflow_plus_kvs_macro" +version = "0.0.0" +dependencies = [ + "hydroflow_plus", + "hydroflow_plus_kvs_flow", + "regex", + "serde", + "stagefright", + "stagefright_tool", +] + +[[package]] +name = "hydroflow_plus_kvs_runtime" +version = "0.0.0" +dependencies = [ + "hydroflow_plus", + "hydroflow_plus_kvs", + "regex", + "serde", +] + [[package]] name = "iana-time-zone" version = "0.1.56" @@ -2550,7 +2607,7 @@ dependencies = [ "serde", "serde_json", "syn 1.0.109", - "syn-inline-mod", + "syn-inline-mod 0.5.0", "tempfile", "tree-sitter", "tree-sitter-cli", @@ -2813,6 +2870,40 @@ dependencies = [ "parking_lot 0.11.2", ] +[[package]] +name = "stagefright" +version = "0.4.0" +dependencies = [ + "proc-macro-crate", + "proc-macro2", + "quote", + "stagefright_macro", + "syn 2.0.14", +] + +[[package]] +name = "stagefright_macro" +version = "0.4.0" +dependencies = [ + "lazy_static", + "proc-macro-crate", + "proc-macro2", + "quote", + "syn 2.0.14", +] + +[[package]] +name = "stagefright_tool" +version = "0.4.0" +dependencies = [ + "lazy_static", + "proc-macro-crate", + "proc-macro2", + "quote", + "syn 2.0.14", + "syn-inline-mod 0.6.0", +] + [[package]] name = "static_assertions" version = "1.1.0" @@ -2863,6 +2954,16 @@ dependencies = [ "syn 1.0.109", ] +[[package]] +name = "syn-inline-mod" +version = "0.6.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "2fa6dca1fdb7b2ed46dd534a326725419d4fb10f23d8c85a8b2860e5eb25d0f9" +dependencies = [ + "proc-macro2", + "syn 2.0.14", +] + [[package]] name = "synstructure" version = "0.12.6" diff --git a/Cargo.toml b/Cargo.toml index 0929d37cd47..6687bd1e7e8 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -18,10 +18,18 @@ members = [ "hydroflow_datalog_core", "hydroflow_lang", "hydroflow_macro", + "hydroflow_plus", + "hydroflow_plus_kvs_flow", + "hydroflow_plus_kvs_macro", + "hydroflow_plus_kvs", + "hydroflow_plus_kvs_server", "lattices", "multiplatform_test", "pusherator", "relalg", + "stagefright", + "stagefright_macro", + "stagefright_tool", "variadics", "website_playground", ] diff --git a/hydroflow/examples/rga/adjacency.rs b/hydroflow/examples/rga/adjacency.rs index 546e0b524ce..5f8d0e74a4a 100644 --- a/hydroflow/examples/rga/adjacency.rs +++ b/hydroflow/examples/rga/adjacency.rs @@ -11,7 +11,7 @@ pub(crate) fn rga_adjacency( input_recv: UnboundedReceiverStream<(Token, Timestamp)>, rga_send: UnboundedSender<(Token, Timestamp)>, list_send: UnboundedSender<(Timestamp, Timestamp)>, -) -> Hydroflow { +) -> Hydroflow<'static> { hydroflow_syntax! { insertAfter = source_stream(input_recv) -> tee(); diff --git a/hydroflow/examples/rga/datalog.rs b/hydroflow/examples/rga/datalog.rs index 362d9b7ccc7..787a6df93f1 100644 --- a/hydroflow/examples/rga/datalog.rs +++ b/hydroflow/examples/rga/datalog.rs @@ -9,7 +9,7 @@ pub(crate) fn rga_datalog( input_recv: UnboundedReceiverStream<(Token, Timestamp)>, rga_send: UnboundedSender<(Token, Timestamp)>, list_send: UnboundedSender<(Timestamp, Timestamp)>, -) -> Hydroflow { +) -> Hydroflow<'static> { hydroflow_syntax! { edges = source_stream(input_recv) -> tee(); insertAfter = edges -> map(|(c, p): (Token, Timestamp) | (c.ts, p)) -> tee(); diff --git a/hydroflow/examples/rga/datalog_agg.rs b/hydroflow/examples/rga/datalog_agg.rs index 4739012c08c..6b7109e03de 100644 --- a/hydroflow/examples/rga/datalog_agg.rs +++ b/hydroflow/examples/rga/datalog_agg.rs @@ -9,7 +9,7 @@ pub(crate) fn rga_datalog_agg( input_recv: UnboundedReceiverStream<(Token, Timestamp)>, rga_send: UnboundedSender<(Token, Timestamp)>, list_send: UnboundedSender<(Timestamp, Timestamp)>, -) -> Hydroflow { +) -> Hydroflow<'static> { hydroflow_syntax! { edges = source_stream(input_recv) -> tee(); insertAfter = edges -> map(|(c, p): (Token, Timestamp)| (c.ts, p)) -> tee(); diff --git a/hydroflow/examples/rga/minimal.rs b/hydroflow/examples/rga/minimal.rs index e78032a1bff..23b3583fa5c 100644 --- a/hydroflow/examples/rga/minimal.rs +++ b/hydroflow/examples/rga/minimal.rs @@ -9,7 +9,7 @@ pub(crate) fn rga_minimal( input_recv: UnboundedReceiverStream<(Token, Timestamp)>, rga_send: UnboundedSender<(Token, Timestamp)>, _list_send: UnboundedSender<(Timestamp, Timestamp)>, -) -> Hydroflow { +) -> Hydroflow<'static> { hydroflow_syntax! { insertAfter = source_stream(input_recv); diff --git a/hydroflow/examples/shopping/flows/bp_flow.rs b/hydroflow/examples/shopping/flows/bp_flow.rs index 93e8e8a6f08..960dfffca80 100644 --- a/hydroflow/examples/shopping/flows/bp_flow.rs +++ b/hydroflow/examples/shopping/flows/bp_flow.rs @@ -16,7 +16,7 @@ pub(crate) async fn bp_flow( shopping_bp: impl Iterator)> + 'static, out_addr: SocketAddr, out: SplitSink, (Bytes, SocketAddr)>, -) -> Hydroflow { +) -> Hydroflow<'static> { let client_class = client_class_iter(); // First define some shorthand for the merge and bot of this lattice diff --git a/hydroflow/examples/shopping/flows/client_state_flow.rs b/hydroflow/examples/shopping/flows/client_state_flow.rs index 0d12ed44ed4..8d4cdb511b9 100644 --- a/hydroflow/examples/shopping/flows/client_state_flow.rs +++ b/hydroflow/examples/shopping/flows/client_state_flow.rs @@ -18,7 +18,7 @@ pub(crate) async fn client_state_flow( out: SplitSink, (Bytes, SocketAddr)>, local_addr: SocketAddr, remote_addr: SocketAddr, -) -> Hydroflow { +) -> Hydroflow<'static> { let client_class = client_class_iter(); // First define some shorthand for the merge and bot of this lattice diff --git a/hydroflow/examples/shopping/flows/listener_flow.rs b/hydroflow/examples/shopping/flows/listener_flow.rs index ff4114bbee6..0ab38af0b8c 100644 --- a/hydroflow/examples/shopping/flows/listener_flow.rs +++ b/hydroflow/examples/shopping/flows/listener_flow.rs @@ -11,7 +11,7 @@ pub(crate) async fn listener_flow( tuple_input: UdpStream, bp_input: UdpStream, ssiv_input: UdpStream, -) -> Hydroflow { +) -> Hydroflow<'static> { // Simply print what we receive. hydroflow_syntax! { source_stream_serde(tuple_input) diff --git a/hydroflow/examples/shopping/flows/orig_flow.rs b/hydroflow/examples/shopping/flows/orig_flow.rs index d5803399619..c92f080c813 100644 --- a/hydroflow/examples/shopping/flows/orig_flow.rs +++ b/hydroflow/examples/shopping/flows/orig_flow.rs @@ -14,7 +14,7 @@ pub(crate) async fn orig_flow( shopping: impl Iterator + 'static, out_addr: SocketAddr, out: SplitSink, (Bytes, SocketAddr)>, -) -> Hydroflow { +) -> Hydroflow<'static> { let client_class = client_class_iter(); // This is the straightforward single-transducer sequential case. diff --git a/hydroflow/examples/shopping/flows/push_group_flow.rs b/hydroflow/examples/shopping/flows/push_group_flow.rs index 4f22e3a0c96..210f567a8dd 100644 --- a/hydroflow/examples/shopping/flows/push_group_flow.rs +++ b/hydroflow/examples/shopping/flows/push_group_flow.rs @@ -16,7 +16,7 @@ pub(crate) async fn push_group_flow( shopping_ssiv: impl Iterator)> + 'static, out_addr: SocketAddr, out: SplitSink, (Bytes, SocketAddr)>, -) -> Hydroflow { +) -> Hydroflow<'static> { let client_class = client_class_iter(); // First define some shorthand for the merge and bot of this lattice diff --git a/hydroflow/examples/shopping/flows/rep_server_flow.rs b/hydroflow/examples/shopping/flows/rep_server_flow.rs index 26b8e446e11..8b62825b7a6 100644 --- a/hydroflow/examples/shopping/flows/rep_server_flow.rs +++ b/hydroflow/examples/shopping/flows/rep_server_flow.rs @@ -20,7 +20,7 @@ pub(crate) async fn rep_server_flow( remote_addr: SocketAddr, gossip_addr: SocketAddr, server_addrs: impl Iterator + 'static, -) -> Hydroflow { +) -> Hydroflow<'static> { let (broadcast_out, broadcast_in, _) = hydroflow::util::bind_udp_bytes(gossip_addr).await; let client_class = client_class_iter(); let ssiv_merge = diff --git a/hydroflow/examples/shopping/flows/server_state_flow.rs b/hydroflow/examples/shopping/flows/server_state_flow.rs index d20416b654d..9b1bcde8640 100644 --- a/hydroflow/examples/shopping/flows/server_state_flow.rs +++ b/hydroflow/examples/shopping/flows/server_state_flow.rs @@ -18,7 +18,7 @@ pub(crate) async fn server_state_flow( out: SplitSink, (Bytes, SocketAddr)>, local_addr: SocketAddr, remote_addr: SocketAddr, -) -> Hydroflow { +) -> Hydroflow<'static> { let client_class = client_class_iter(); // First define some shorthand for the merge and bot of this lattice diff --git a/hydroflow/examples/shopping/flows/ssiv_flow.rs b/hydroflow/examples/shopping/flows/ssiv_flow.rs index 154d224c2cc..34ddfefc516 100644 --- a/hydroflow/examples/shopping/flows/ssiv_flow.rs +++ b/hydroflow/examples/shopping/flows/ssiv_flow.rs @@ -16,7 +16,7 @@ pub(crate) async fn ssiv_flow( shopping_ssiv: impl Iterator)> + 'static, out_addr: SocketAddr, out: SplitSink, (Bytes, SocketAddr)>, -) -> Hydroflow { +) -> Hydroflow<'static> { let client_class = client_class_iter(); // First define some shorthand for the merge and bot of this lattice diff --git a/hydroflow/src/scheduled/graph.rs b/hydroflow/src/scheduled/graph.rs index 47afe68eb0d..e4745c1a195 100644 --- a/hydroflow/src/scheduled/graph.rs +++ b/hydroflow/src/scheduled/graph.rs @@ -24,8 +24,8 @@ use super::{HandoffId, SubgraphId}; use crate::Never; /// A Hydroflow graph. Owns, schedules, and runs the compiled subgraphs. -pub struct Hydroflow { - pub(super) subgraphs: Vec, +pub struct Hydroflow<'a> { + pub(super) subgraphs: Vec>, pub(super) context: Context, handoffs: Vec, @@ -44,7 +44,7 @@ pub struct Hydroflow { /// See [`Self::diagnostics()`]. diagnostics: Option>>, } -impl Default for Hydroflow { +impl<'a> Default for Hydroflow<'a> { fn default() -> Self { let stratum_queues = vec![Default::default()]; // Always initialize stratum #0. let (event_queue_send, event_queue_recv) = mpsc::unbounded_channel(); @@ -78,7 +78,7 @@ impl Default for Hydroflow { } } } -impl Hydroflow { +impl<'a> Hydroflow<'a> { /// Create a new empty Hydroflow graph. pub fn new() -> Self { Default::default() @@ -507,7 +507,7 @@ impl Hydroflow { Name: Into>, R: 'static + PortList, W: 'static + PortList, - F: 'static + for<'ctx> FnMut(&'ctx mut Context, R::Ctx<'ctx>, W::Ctx<'ctx>), + F: 'a + for<'ctx> FnMut(&'ctx mut Context, R::Ctx<'ctx>, W::Ctx<'ctx>), { let sg_id = SubgraphId(self.subgraphs.len()); @@ -675,7 +675,7 @@ impl Hydroflow { } } -impl Hydroflow { +impl<'a> Hydroflow<'a> { /// Alias for [`Context::spawn_task`]. pub fn spawn_task(&mut self, future: Fut) where @@ -695,7 +695,7 @@ impl Hydroflow { } } -impl Drop for Hydroflow { +impl<'a> Drop for Hydroflow<'a> { fn drop(&mut self) { self.abort_tasks(); } @@ -740,14 +740,14 @@ impl HandoffData { /// /// Used internally by the [Hydroflow] struct to represent the dataflow graph /// structure and scheduled state. -pub(super) struct SubgraphData { +pub(super) struct SubgraphData<'a> { /// A friendly name for diagnostics. #[allow(dead_code)] // TODO(mingwei): remove attr once used. pub(super) name: Cow<'static, str>, /// This subgraph's stratum number. pub(super) stratum: usize, /// The actual execution code of the subgraph. - subgraph: Box, + subgraph: Box, #[allow(dead_code)] preds: Vec, succs: Vec, @@ -761,11 +761,11 @@ pub(super) struct SubgraphData { /// Keep track of the last tick that this subgraph was run in last_tick_run_in: Option, } -impl SubgraphData { +impl<'a> SubgraphData<'a> { pub fn new( name: Cow<'static, str>, stratum: usize, - subgraph: impl 'static + Subgraph, + subgraph: impl Subgraph + 'a, preds: Vec, succs: Vec, is_scheduled: bool, diff --git a/hydroflow/src/scheduled/graph_ext.rs b/hydroflow/src/scheduled/graph_ext.rs index db8b48726e0..f1245516a15 100644 --- a/hydroflow/src/scheduled/graph_ext.rs +++ b/hydroflow/src/scheduled/graph_ext.rs @@ -125,7 +125,7 @@ pub trait GraphExt { W: 'static + Handoff + CanReceive; } -impl GraphExt for Hydroflow { +impl<'a> GraphExt for Hydroflow<'a> { subgraph_ext!(impl add_subgraph_sink, (recv_port: R), ()); subgraph_ext!( impl add_subgraph_2sink, diff --git a/hydroflow/src/scheduled/net/mod.rs b/hydroflow/src/scheduled/net/mod.rs index ae1fbde936f..07309c81446 100644 --- a/hydroflow/src/scheduled/net/mod.rs +++ b/hydroflow/src/scheduled/net/mod.rs @@ -96,7 +96,7 @@ impl Message { } } -impl Hydroflow { +impl<'a> Hydroflow<'a> { fn register_read_tcp_stream(&mut self, reader: OwnedReadHalf) -> RecvPort> { let reader = FramedRead::new(reader, LengthDelimitedCodec::new()); let (send_port, recv_port) = self.make_edge("tcp ingress handoff"); diff --git a/hydroflow/src/scheduled/net/network_vertex.rs b/hydroflow/src/scheduled/net/network_vertex.rs index 508437a2df5..13077e6290e 100644 --- a/hydroflow/src/scheduled/net/network_vertex.rs +++ b/hydroflow/src/scheduled/net/network_vertex.rs @@ -18,7 +18,7 @@ pub type Address = String; // These methods can't be wrapped up in a trait because async methods are not // allowed in traits (yet). -impl Hydroflow { +impl<'a> Hydroflow<'a> { // TODO(justin): document these, but they're derivatives of inbound_tcp_vertex_internal. pub async fn inbound_tcp_vertex_port(&mut self, port: u16) -> RecvPort> where diff --git a/hydroflow/src/scheduled/query.rs b/hydroflow/src/scheduled/query.rs index 04da7d34704..af9b0d6cc4c 100644 --- a/hydroflow/src/scheduled/query.rs +++ b/hydroflow/src/scheduled/query.rs @@ -15,16 +15,16 @@ use crate::scheduled::handoff::VecHandoff; const QUERY_EDGE_NAME: Cow<'static, str> = Cow::Borrowed("query handoff"); #[derive(Default)] -pub struct Query { - df: Rc>, +pub struct Query<'a> { + df: Rc>>, } -impl Query { +impl<'a> Query<'a> { pub fn new() -> Self { Default::default() } - pub fn source(&mut self, f: F) -> Operator + pub fn source(&mut self, f: F) -> Operator<'a, T> where T: 'static, F: 'static + FnMut(&Context, &SendCtx>), @@ -40,7 +40,7 @@ impl Query { } } - pub fn concat(&mut self, ops: Vec>) -> Operator + pub fn concat(&mut self, ops: Vec>) -> Operator<'a, T> where T: 'static, { @@ -69,19 +69,19 @@ impl Query { } } -pub struct Operator +pub struct Operator<'a, T> where T: 'static, { - df: Rc>, + df: Rc>>, recv_port: RecvPort>, } -impl Operator +impl<'a, T> Operator<'a, T> where T: 'static, { - pub fn map(self, mut f: F) -> Operator + pub fn map(self, mut f: F) -> Operator<'a, U> where F: 'static + Fn(T) -> U, U: 'static, @@ -101,7 +101,7 @@ where } #[must_use] - pub fn filter(self, mut f: F) -> Operator + pub fn filter(self, mut f: F) -> Operator<'a, T> where F: 'static + Fn(&T) -> bool, { @@ -125,7 +125,7 @@ where } #[must_use] - pub fn concat(self, other: Operator) -> Operator { + pub fn concat(self, other: Operator<'a, T>) -> Operator<'a, T> { // TODO(justin): this is very slow. let mut df = self.df.borrow_mut(); @@ -163,8 +163,8 @@ where } } -impl Operator { - pub fn tee(self, n: usize) -> Vec> +impl<'a, T: Clone> Operator<'a, T> { + pub fn tee(self, n: usize) -> Vec> where T: 'static, { diff --git a/hydroflow/src/util/cli.rs b/hydroflow/src/util/cli.rs index 90928acd8a0..c5f359fa0af 100644 --- a/hydroflow/src/util/cli.rs +++ b/hydroflow/src/util/cli.rs @@ -6,7 +6,7 @@ pub use hydroflow_cli_integration::*; use crate::scheduled::graph::Hydroflow; -pub async fn launch_flow(mut flow: Hydroflow) { +pub async fn launch_flow(mut flow: Hydroflow<'_>) { let stop = tokio::sync::oneshot::channel(); tokio::task::spawn_blocking(|| { let mut line = String::new(); diff --git a/hydroflow_plus/Cargo.toml b/hydroflow_plus/Cargo.toml new file mode 100644 index 00000000000..c2f2394d802 --- /dev/null +++ b/hydroflow_plus/Cargo.toml @@ -0,0 +1,24 @@ +[package] +name = "hydroflow_plus" +publish = true +version = "0.4.0" +edition = "2021" +license = "Apache-2.0" +documentation = "https://docs.rs/hydroflow_plus/" +description = "Functional programming API for hydroflow" + +[lib] +path = "src/lib.rs" + +[features] +default = [] +diagnostics = [ "hydroflow_lang/diagnostics" ] + +[dependencies] +quote = "1.0.0" +syn = { version = "2.0.0", features = [ "parsing", "extra-traits" ] } +proc-macro2 = "1.0.57" +proc-macro-crate = "1.1.0" +hydroflow = { path = "../hydroflow", version = "^0.4.0" } +hydroflow_lang = { path = "../hydroflow_lang", version = "^0.4.0" } +stagefright = { path = "../stagefright", version = "^0.4.0" } diff --git a/hydroflow_plus/src/lib.rs b/hydroflow_plus/src/lib.rs new file mode 100644 index 00000000000..21c13bf1827 --- /dev/null +++ b/hydroflow_plus/src/lib.rs @@ -0,0 +1,324 @@ +use std::cell::RefCell; +use std::marker::PhantomData; + +pub use hydroflow; +use hydroflow::futures::stream::Stream; +use hydroflow::scheduled::context::Context; +use hydroflow::scheduled::graph::Hydroflow; +use hydroflow_lang::graph::{partition_graph, propegate_flow_props, FlatGraphBuilder}; +use proc_macro2::{Span, TokenStream}; +use quote::{quote, ToTokens}; +use stagefright::runtime_support::{FreeVariable, ToFreeVariableTokens}; +use stagefright::{IntoQuotedMut, IntoQuotedOnce, Quoted, QuotedContext}; +use syn::parse_quote; + +pub fn hydroflow_build(f: impl FnOnce() -> FlatGraphBuilder) -> TokenStream { + let hydroflow_crate = proc_macro_crate::crate_name("hydroflow_plus") + .expect("hydroflow_plus should be present in `Cargo.toml`"); + let root = match hydroflow_crate { + proc_macro_crate::FoundCrate::Itself => quote! { hydroflow_plus::hydroflow }, + proc_macro_crate::FoundCrate::Name(name) => { + let ident = syn::Ident::new(&name, Span::call_site()); + quote! { #ident::hydroflow } + } + }; + + let (flat_graph, _, _) = f().build(); + let mut partitioned_graph = + partition_graph(flat_graph).expect("Failed to partition (cycle detected)."); + + let mut diagnostics = Vec::new(); + // Propgeate flow properties throughout the graph. + // TODO(mingwei): Should this be done at a flat graph stage instead? + let _ = propegate_flow_props::propegate_flow_props(&mut partitioned_graph, &mut diagnostics); + + partitioned_graph.as_code(&root, true, quote::quote!(), &mut diagnostics) +} + +#[derive(Clone)] +pub struct RuntimeContext<'a> { + _phantom: PhantomData<&'a mut &'a ()>, +} + +impl Copy for RuntimeContext<'_> {} + +impl<'a> ToFreeVariableTokens for RuntimeContext<'a> { + fn to_tokens(&self) -> (Option, Option) { + (None, Some(quote!(&context))) + } +} + +impl<'a> FreeVariable<&'a Context> for RuntimeContext<'a> {} + +pub struct HfBuilt<'a> { + tokens: TokenStream, + _phantom: PhantomData<&'a mut &'a ()>, +} + +impl<'a> Quoted> for HfBuilt<'a> {} + +impl<'a> ToTokens for HfBuilt<'a> { + fn to_tokens(&self, tokens: &mut TokenStream) { + self.tokens.to_tokens(tokens); + } +} + +pub struct HfBuilder<'a> { + next_id: RefCell, + pub(crate) builder: RefCell>, + _phantom: PhantomData<&'a mut &'a ()>, +} + +impl<'a> QuotedContext for HfBuilder<'a> { + fn create() -> Self { + HfBuilder::new() + } +} + +impl<'a> HfBuilder<'a> { + #[allow(clippy::new_without_default)] + pub fn new() -> HfBuilder<'a> { + HfBuilder { + next_id: RefCell::new(0), + builder: RefCell::new(Some(FlatGraphBuilder::new())), + _phantom: PhantomData, + } + } + + pub fn build(&self) -> HfBuilt<'a> { + let builder = self.builder.borrow_mut().take().unwrap(); + HfBuilt { + tokens: hydroflow_build(|| builder), + _phantom: PhantomData, + } + } + + pub fn runtime_context(&self) -> RuntimeContext<'a> { + RuntimeContext { + _phantom: PhantomData, + } + } + + pub fn source_stream + Unpin>( + &'a self, + e: impl IntoQuotedOnce<'a, E>, + ) -> HfStream<'a, T> { + let next_id = { + let mut next_id = self.next_id.borrow_mut(); + let id = *next_id; + *next_id += 1; + id + }; + + let ident = syn::Ident::new(&format!("source_{}", next_id), Span::call_site()); + let e = e.to_quoted(); + + self.builder + .borrow_mut() + .as_mut() + .unwrap() + .add_statement(parse_quote! { + #ident = source_stream(#e) -> tee(); + }); + + HfStream { + ident, + graph: self, + _phantom: PhantomData, + } + } + + pub fn source_iter>( + &'a self, + e: impl IntoQuotedOnce<'a, E>, + ) -> HfStream<'a, T> { + let next_id = { + let mut next_id = self.next_id.borrow_mut(); + let id = *next_id; + *next_id += 1; + id + }; + + let ident = syn::Ident::new(&format!("source_{}", next_id), Span::call_site()); + let e = e.to_quoted(); + + self.builder + .borrow_mut() + .as_mut() + .unwrap() + .add_statement(parse_quote! { + #ident = source_iter(#e) -> tee(); + }); + + HfStream { + ident, + graph: self, + _phantom: PhantomData, + } + } +} + +pub struct HfStream<'a, T> { + ident: syn::Ident, + graph: &'a HfBuilder<'a>, + _phantom: PhantomData<&'a mut &'a T>, +} + +impl<'a, T> HfStream<'a, T> { + pub fn map U + 'a>(&self, f: impl IntoQuotedMut<'a, F>) -> HfStream<'a, U> { + let next_id = { + let mut next_id = self.graph.next_id.borrow_mut(); + let id = *next_id; + *next_id += 1; + id + }; + + let self_ident = &self.ident; + let ident = syn::Ident::new(&format!("map_{}", next_id), Span::call_site()); + let f = f.to_quoted(); + + self.graph + .builder + .borrow_mut() + .as_mut() + .unwrap() + .add_statement(parse_quote! { + #ident = #self_ident -> map(#f) -> tee(); + }); + + HfStream { + ident, + graph: self.graph, + _phantom: PhantomData, + } + } + + pub fn filter bool + 'a>(&self, f: impl IntoQuotedMut<'a, F>) -> HfStream<'a, T> { + let next_id = { + let mut next_id = self.graph.next_id.borrow_mut(); + let id = *next_id; + *next_id += 1; + id + }; + + let self_ident = &self.ident; + let ident = syn::Ident::new(&format!("filter_{}", next_id), Span::call_site()); + let f = f.to_quoted(); + + self.graph + .builder + .borrow_mut() + .as_mut() + .unwrap() + .add_statement(parse_quote! { + #ident = #self_ident -> filter(#f) -> tee(); + }); + + HfStream { + ident, + graph: self.graph, + _phantom: PhantomData, + } + } + + pub fn filter_map Option + 'a>( + &self, + f: impl IntoQuotedMut<'a, F>, + ) -> HfStream<'a, U> { + let next_id = { + let mut next_id = self.graph.next_id.borrow_mut(); + let id = *next_id; + *next_id += 1; + id + }; + + let self_ident = &self.ident; + let ident = syn::Ident::new(&format!("filter_{}", next_id), Span::call_site()); + let f = f.to_quoted(); + + self.graph + .builder + .borrow_mut() + .as_mut() + .unwrap() + .add_statement(parse_quote! { + #ident = #self_ident -> filter_map(#f) -> tee(); + }); + + HfStream { + ident, + graph: self.graph, + _phantom: PhantomData, + } + } + + pub fn for_each(&self, f: impl IntoQuotedMut<'a, F>) { + let next_id = { + let mut next_id = self.graph.next_id.borrow_mut(); + let id = *next_id; + *next_id += 1; + id + }; + + let self_ident = &self.ident; + let ident = syn::Ident::new(&format!("for_each_{}", next_id), Span::call_site()); + let f = f.to_quoted(); + + self.graph + .builder + .borrow_mut() + .as_mut() + .unwrap() + .add_statement(parse_quote! { + #ident = #self_ident -> for_each(#f); + }); + } +} + +impl<'a, K, V1> HfStream<'a, (K, V1)> { + pub fn join(&'a self, n: &HfStream<(K, V2)>) -> HfStream<(K, (V1, V2))> { + let next_id = { + let mut next_id = self.graph.next_id.borrow_mut(); + let id = *next_id; + *next_id += 1; + id + }; + + let self_ident = &self.ident; + let other_ident = &n.ident; + let ident = syn::Ident::new(&format!("for_each_{}", next_id), Span::call_site()); + + self.graph + .builder + .borrow_mut() + .as_mut() + .unwrap() + .add_statement(parse_quote! { + #ident = join() -> tee(); + }); + + self.graph + .builder + .borrow_mut() + .as_mut() + .unwrap() + .add_statement(parse_quote! { + #self_ident -> [0]#ident; + }); + + self.graph + .builder + .borrow_mut() + .as_mut() + .unwrap() + .add_statement(parse_quote! { + #other_ident -> [1]#ident; + }); + + HfStream { + ident, + graph: self.graph, + _phantom: PhantomData, + } + } +} diff --git a/hydroflow_plus_kvs/Cargo.toml b/hydroflow_plus_kvs/Cargo.toml new file mode 100644 index 00000000000..1e5d0e7ac2e --- /dev/null +++ b/hydroflow_plus_kvs/Cargo.toml @@ -0,0 +1,18 @@ +[package] +name = "hydroflow_plus_kvs" +publish = false +version = "0.0.0" +edition = "2021" + +[lib] +path = "src/lib.rs" + +[dependencies] +hydroflow_plus = { path = "../hydroflow_plus", version = "^0.4.0" } +stagefright = { path = "../stagefright", version = "^0.4.0" } +regex = "1" +serde = "1" +hydroflow_plus_kvs_macro = { path = "../hydroflow_plus_kvs_macro" } + +[build-dependencies] +stagefright_tool = { path = "../stagefright_tool", version = "^0.4.0" } diff --git a/hydroflow_plus_kvs/build.rs b/hydroflow_plus_kvs/build.rs new file mode 100644 index 00000000000..c3d3085f3e6 --- /dev/null +++ b/hydroflow_plus_kvs/build.rs @@ -0,0 +1,5 @@ +use std::path::Path; + +fn main() { + stagefright_tool::gen_final(Path::new("../hydroflow_plus_kvs_flow")); +} diff --git a/hydroflow_plus_kvs/src/lib.rs b/hydroflow_plus_kvs/src/lib.rs new file mode 100644 index 00000000000..26d2eac985f --- /dev/null +++ b/hydroflow_plus_kvs/src/lib.rs @@ -0,0 +1,9 @@ +#![allow(unused)] + +pub(crate) use hydroflow_plus_kvs_macro as __macro; + +pub mod __flow { + include!(concat!(env!("OUT_DIR"), "/lib_pub.rs")); +} + +include!(concat!(env!("OUT_DIR"), "/lib.rs")); diff --git a/hydroflow_plus_kvs_flow/Cargo.toml b/hydroflow_plus_kvs_flow/Cargo.toml new file mode 100644 index 00000000000..7fb49309186 --- /dev/null +++ b/hydroflow_plus_kvs_flow/Cargo.toml @@ -0,0 +1,14 @@ +[package] +name = "hydroflow_plus_kvs_flow" +publish = false +version = "0.0.0" +edition = "2021" + +[lib] +path = "src/lib.rs" + +[dependencies] +hydroflow_plus = { path = "../hydroflow_plus", version = "^0.4.0" } +stagefright = { path = "../stagefright", version = "^0.4.0" } +regex = "1" +serde = "1" diff --git a/hydroflow_plus_kvs_flow/src/lib.rs b/hydroflow_plus_kvs_flow/src/lib.rs new file mode 100644 index 00000000000..322e4f321fd --- /dev/null +++ b/hydroflow_plus_kvs_flow/src/lib.rs @@ -0,0 +1,51 @@ +use hydroflow_plus::hydroflow::bytes::Bytes; +use hydroflow_plus::hydroflow::scheduled::graph::Hydroflow; +use hydroflow_plus::hydroflow::tokio_stream::wrappers::UnboundedReceiverStream; +use hydroflow_plus::hydroflow::util; +use hydroflow_plus::HfBuilder; +use serde::{Deserialize, Serialize}; +use stagefright::{q, Quoted, RuntimeData}; + +#[derive(PartialEq, Eq, Clone, Serialize, Deserialize, Debug)] +pub enum KVSMessage { + Put { key: String, value: String }, + Get { key: String }, + Response { key: String, value: String }, +} + +#[stagefright::entry] +pub fn my_example_flow<'a>( + graph: &'a HfBuilder<'a>, + debug: u32, + input_stream: RuntimeData>, +) -> impl Quoted> { + let inbound_channel = + graph + .source_stream(q!(input_stream)) + .map(q!(|bytes| util::deserialize_from_bytes::( + bytes + ) + .unwrap())); + + let gets = inbound_channel.filter_map(q!(|msg| match msg { + KVSMessage::Get { key } => Some(key), + _ => None, + })); + + let puts = inbound_channel.filter_map(q!(|msg| match msg { + KVSMessage::Put { key, value } => Some((key, value)), + _ => None, + })); + + if debug == 1 { + puts.for_each(q!(|msg| { + println!("Got a Put {:?}", msg); + })); + + gets.for_each(q!(|msg| { + println!("Got a Get {:?}", msg); + })); + } + + graph.build() +} diff --git a/hydroflow_plus_kvs_macro/Cargo.toml b/hydroflow_plus_kvs_macro/Cargo.toml new file mode 100644 index 00000000000..709913db436 --- /dev/null +++ b/hydroflow_plus_kvs_macro/Cargo.toml @@ -0,0 +1,19 @@ +[package] +name = "hydroflow_plus_kvs_macro" +publish = false +version = "0.0.0" +edition = "2021" + +[lib] +proc-macro = true +path = "src/lib.rs" + +[dependencies] +hydroflow_plus = { path = "../hydroflow_plus", version = "^0.4.0" } +stagefright = { path = "../stagefright", version = "^0.4.0" } +regex = "1" +serde = "1" +hydroflow_plus_kvs_flow = { path = "../hydroflow_plus_kvs_flow" } + +[build-dependencies] +stagefright_tool = { path = "../stagefright_tool", version = "^0.4.0" } diff --git a/hydroflow_plus_kvs_macro/build.rs b/hydroflow_plus_kvs_macro/build.rs new file mode 100644 index 00000000000..083c11f1716 --- /dev/null +++ b/hydroflow_plus_kvs_macro/build.rs @@ -0,0 +1,8 @@ +use std::path::Path; + +fn main() { + stagefright_tool::gen_macro( + Path::new("../hydroflow_plus_kvs_flow"), + "hydroflow_plus_kvs", + ); +} diff --git a/hydroflow_plus_kvs_macro/src/lib.rs b/hydroflow_plus_kvs_macro/src/lib.rs new file mode 100644 index 00000000000..cc6ef076ae3 --- /dev/null +++ b/hydroflow_plus_kvs_macro/src/lib.rs @@ -0,0 +1,5 @@ +#![allow(unused)] + +pub(crate) use hydroflow_plus_kvs_flow as __flow; + +include!(concat!(env!("OUT_DIR"), "/lib.rs")); diff --git a/hydroflow_plus_kvs_server/Cargo.toml b/hydroflow_plus_kvs_server/Cargo.toml new file mode 100644 index 00000000000..ec00307ae27 --- /dev/null +++ b/hydroflow_plus_kvs_server/Cargo.toml @@ -0,0 +1,11 @@ +[package] +name = "hydroflow_plus_kvs_runtime" +publish = false +version = "0.0.0" +edition = "2021" + +[dependencies] +hydroflow_plus = { path = "../hydroflow_plus", version = "^0.4.0" } +regex = "1" +serde = "1" +hydroflow_plus_kvs = { path = "../hydroflow_plus_kvs" } diff --git a/hydroflow_plus_kvs_server/src/main.rs b/hydroflow_plus_kvs_server/src/main.rs new file mode 100644 index 00000000000..ee2a469a32a --- /dev/null +++ b/hydroflow_plus_kvs_server/src/main.rs @@ -0,0 +1,8 @@ +use hydroflow_plus::hydroflow::bytes::Bytes; +use hydroflow_plus_kvs::my_example_flow; + +fn main() { + let (_send, recv) = hydroflow_plus::hydroflow::util::unbounded_channel::(); + let mut flow = my_example_flow!(1, recv); + flow.run_tick(); +} diff --git a/stagefright/Cargo.toml b/stagefright/Cargo.toml new file mode 100644 index 00000000000..c86e07fff75 --- /dev/null +++ b/stagefright/Cargo.toml @@ -0,0 +1,18 @@ +[package] +name = "stagefright" +publish = true +version = "0.4.0" +edition = "2021" +license = "Apache-2.0" +documentation = "https://docs.rs/stagefright/" +description = "Type-safe staged programming for Rust" + +[lib] +path = "src/lib.rs" + +[dependencies] +quote = "1.0.0" +syn = { version = "2.0.0", features = [ "parsing", "extra-traits" ] } +proc-macro2 = "1.0.57" +proc-macro-crate = "1.1.0" +stagefright_macro = { path = "../stagefright_macro", version = "^0.4.0" } diff --git a/stagefright/src/lib.rs b/stagefright/src/lib.rs new file mode 100644 index 00000000000..5f36dc5d268 --- /dev/null +++ b/stagefright/src/lib.rs @@ -0,0 +1,155 @@ +use std::marker::PhantomData; + +use proc_macro2::{Span, TokenStream}; +use quote::{quote, ToTokens}; + +pub mod internal { + pub use proc_macro2::TokenStream; + pub use quote::quote; + pub use syn; +} + +pub use stagefright_macro::{entry, q, quse_fn}; + +pub mod runtime_support; +use runtime_support::{FreeVariable, ToFreeVariableTokens, CURRENT_FINAL_CRATE}; + +pub trait QuotedContext { + fn create() -> Self; +} + +pub trait Quoted: Sized + ToTokens { + fn build(self) -> TokenStream { + ToTokens::into_token_stream(self) + } +} + +type FreeVariables = Vec<(String, (Option, Option))>; + +pub trait IntoQuotedOnce<'a, T>: + FnOnce(&mut String, &mut String, &mut FreeVariables, bool) -> T + 'a +where + Self: Sized, +{ + fn to_quoted(self) -> QuotedExpr { + let mut module_path = String::new(); + let mut str = String::new(); + let mut free_variables = Vec::new(); + // this is an uninit value so we can't drop it + std::mem::forget(self(&mut module_path, &mut str, &mut free_variables, false)); + QuotedExpr::create(module_path, &str, free_variables) + } +} + +impl<'a, T, F: FnOnce(&mut String, &mut String, &mut FreeVariables, bool) -> T + 'a> + IntoQuotedOnce<'a, T> for F +{ +} + +pub trait IntoQuotedMut<'a, T>: + FnMut(&mut String, &mut String, &mut FreeVariables, bool) -> T + 'a +where + Self: Sized, +{ + fn to_quoted(mut self) -> QuotedExpr { + let mut module_path = String::new(); + let mut str = String::new(); + let mut free_variables = Vec::new(); + // this is an uninit value so we can't drop it + std::mem::forget(self(&mut module_path, &mut str, &mut free_variables, false)); + QuotedExpr::create(module_path, &str, free_variables) + } +} + +impl<'a, T, F: FnMut(&mut String, &mut String, &mut FreeVariables, bool) -> T + 'a> + IntoQuotedMut<'a, T> for F +{ +} + +pub struct QuotedExpr { + module_path: syn::Path, + expr: syn::Expr, + free_variables: FreeVariables, + _phantom: PhantomData, +} + +impl QuotedExpr { + pub fn create(module_path: String, expr: &str, free_variables: FreeVariables) -> QuotedExpr { + let module_path = syn::parse_str(&module_path).unwrap(); + let expr = syn::parse_str(expr).unwrap(); + QuotedExpr { + module_path, + expr, + free_variables, + _phantom: PhantomData, + } + } +} + +impl Quoted for QuotedExpr {} + +impl ToTokens for QuotedExpr { + fn to_tokens(&self, tokens: &mut TokenStream) { + let instantiated_free_variables = self.free_variables.iter().flat_map(|(ident, value)| { + let ident = syn::Ident::new(ident, Span::call_site()); + value.0.iter().map(|prelude| quote!(#prelude)).chain( + value + .1 + .iter() + .map(move |value| quote!(let #ident = #value;)), + ) + }); + + let final_crate = CURRENT_FINAL_CRATE.with(|f| *f.borrow()).unwrap(); + let final_crate_path: syn::Path = syn::parse_str(final_crate).unwrap(); + + let module_path = &self.module_path; + let module_path = module_path.segments.iter().skip(1).collect::>(); + let module_path = syn::Path { + leading_colon: None, + segments: syn::punctuated::Punctuated::from_iter(module_path.into_iter().cloned()), + }; + + let expr = &self.expr; + tokens.extend(quote!({ + use ::#final_crate_path::__flow::#module_path *; + #(#instantiated_free_variables)* + #expr + })); + } +} + +pub struct RuntimeData { + ident: &'static str, + _phantom: PhantomData, +} + +impl Copy for RuntimeData {} + +impl Clone for RuntimeData { + fn clone(&self) -> Self { + // TODO(shadaj): mark this as cloned so we clone it in the splice + RuntimeData { + ident: self.ident.clone(), + _phantom: PhantomData, + } + } +} + +impl RuntimeData { + pub fn new(ident: &'static str) -> RuntimeData { + RuntimeData { + ident, + _phantom: PhantomData, + } + } +} + +impl ToFreeVariableTokens for RuntimeData { + fn to_tokens(&self) -> (Option, Option) { + let ident = syn::Ident::new(self.ident, Span::call_site()); + (None, Some(quote!(#ident))) + } +} + +impl FreeVariable for RuntimeData {} diff --git a/stagefright/src/runtime_support.rs b/stagefright/src/runtime_support.rs new file mode 100644 index 00000000000..9c3fa557612 --- /dev/null +++ b/stagefright/src/runtime_support.rs @@ -0,0 +1,163 @@ +use std::cell::RefCell; +use std::marker::PhantomData; +use std::mem::MaybeUninit; + +use proc_macro2::TokenStream; +use quote::quote; + +thread_local!(pub static CURRENT_FINAL_CRATE: RefCell> = RefCell::new(None)); + +pub trait ParseFromLiteral { + fn parse_from_literal(literal: &syn::Expr) -> Self; +} + +impl ParseFromLiteral for u32 { + fn parse_from_literal(literal: &syn::Expr) -> Self { + match literal { + syn::Expr::Lit(syn::ExprLit { + lit: syn::Lit::Int(lit_int), + .. + }) => lit_int.base10_parse().unwrap(), + _ => panic!("Expected literal"), + } + } +} + +pub trait ToFreeVariableTokens { + fn to_tokens(&self) -> (Option, Option); +} + +pub trait ToGlobalFreeVariableTokens { + fn to_tokens(&self) -> (Option, Option); +} + +impl T> ToFreeVariableTokens for F { + fn to_tokens(&self) -> (Option, Option) { + let value = self(); + value.to_tokens() + } +} + +impl T> FreeVariable<()> for F {} + +pub trait FreeVariable +where + Self: Sized, +{ + fn uninitialized(self) -> O { + #[allow(clippy::uninit_assumed_init)] + unsafe { + MaybeUninit::uninit().assume_init() + } + } +} + +impl ToFreeVariableTokens for u32 { + fn to_tokens(&self) -> (Option, Option) { + (None, Some(quote!(#self))) + } +} + +impl FreeVariable for u32 {} + +impl ToGlobalFreeVariableTokens for u32 { + fn to_tokens(&self) -> (Option, Option) { + (None, Some(quote!(#self))) + } +} + +pub struct Type { + path: String, +} + +impl Type { + pub fn new(def: &str) -> Type { + Type { + path: def.to_string(), + } + } +} + +impl ToGlobalFreeVariableTokens for Type { + fn to_tokens(&self) -> (Option, Option) { + let final_crate = CURRENT_FINAL_CRATE.with(|f| *f.borrow()).unwrap(); + + let final_crate_path: syn::Path = syn::parse_str(final_crate).unwrap(); + let parsed: syn::Path = syn::parse_str(&self.path).unwrap(); + // drop the first element of parsed, which is its crate name + let parsed = parsed.segments.iter().skip(1).collect::>(); + let parsed = syn::Path { + leading_colon: None, + segments: syn::punctuated::Punctuated::from_iter(parsed.into_iter().cloned()), + }; + ( + Some(quote!(use ::#final_crate_path::__flow::#parsed;)), + None, + ) + } +} + +pub struct Import { + module_path: &'static str, + path: &'static str, + as_name: &'static str, + _phantom: PhantomData, +} + +impl Copy for Import {} +impl Clone for Import { + fn clone(&self) -> Self { + Import { + module_path: self.module_path, + path: self.path, + as_name: self.as_name, + _phantom: PhantomData, + } + } +} + +pub fn create_import( + module_path: &'static str, + path: &'static str, + as_name: &'static str, + _unused_type_check: T, +) -> Import { + Import { + module_path, + path, + as_name, + _phantom: PhantomData, + } +} + +impl ToFreeVariableTokens for Import { + fn to_tokens(&self) -> (Option, Option) { + let final_crate = CURRENT_FINAL_CRATE.with(|f| *f.borrow()).unwrap(); + let final_crate_path: syn::Path = syn::parse_str(final_crate).unwrap(); + + let module_path = syn::parse_str::(self.module_path).unwrap(); + let parsed = syn::parse_str::(self.path).unwrap(); + let as_ident = syn::Ident::new(self.as_name, proc_macro2::Span::call_site()); + ( + Some(quote!(use #final_crate_path::#module_path::#parsed as #as_ident;)), + None, + ) + } +} + +impl FreeVariable for Import {} + +impl ToGlobalFreeVariableTokens for Import { + fn to_tokens(&self) -> (Option, Option) { + let final_crate = CURRENT_FINAL_CRATE.with(|f| *f.borrow()).unwrap(); + let final_crate_path: syn::Path = syn::parse_str(final_crate).unwrap(); + + let module_path = syn::parse_str::(self.module_path).unwrap(); + let parsed = syn::parse_str::(self.path).unwrap(); + let as_ident = syn::Ident::new(self.as_name, proc_macro2::Span::call_site()); + ( + Some(quote!(use #final_crate_path::#module_path::#parsed as #as_ident;)), + None, + ) + } +} diff --git a/stagefright_macro/Cargo.toml b/stagefright_macro/Cargo.toml new file mode 100644 index 00000000000..34444ff5690 --- /dev/null +++ b/stagefright_macro/Cargo.toml @@ -0,0 +1,19 @@ +[package] +name = "stagefright_macro" +publish = true +version = "0.4.0" +edition = "2021" +license = "Apache-2.0" +documentation = "https://docs.rs/stagefright_macro/" +description = "Helper macros for the stagefright crate" + +[lib] +proc-macro = true +path = "src/lib.rs" + +[dependencies] +quote = "1.0.0" +syn = { version = "2.0.0", features = [ "parsing", "extra-traits", "visit" ] } +proc-macro2 = "1.0.57" +proc-macro-crate = "1.1.0" +lazy_static = "1.4.0" diff --git a/stagefright_macro/src/free_variable/mod.rs b/stagefright_macro/src/free_variable/mod.rs new file mode 100644 index 00000000000..48b42526a65 --- /dev/null +++ b/stagefright_macro/src/free_variable/mod.rs @@ -0,0 +1,231 @@ +use std::collections::HashSet; + +mod prelude; +use prelude::is_prelude; +use syn::punctuated::Punctuated; +use syn::visit::Visit; +use syn::MacroDelimiter; + +#[derive(Debug)] +pub struct ScopeStack { + scopes: Vec<(HashSet, HashSet)>, +} + +impl Default for ScopeStack { + fn default() -> Self { + ScopeStack { + scopes: vec![(HashSet::new(), HashSet::new())], + } + } +} + +impl ScopeStack { + pub fn push(&mut self) { + self.scopes.push((HashSet::new(), HashSet::new())); + } + + pub fn pop(&mut self) { + self.scopes.pop(); + } + + pub fn insert_term(&mut self, ident: syn::Ident) { + self.scopes + .last_mut() + .expect("Scope stack should not be empty") + .0 + .insert(ident.to_string()); + } + + pub fn insert_type(&mut self, ident: syn::Ident) { + self.scopes + .last_mut() + .expect("Scope stack should not be empty") + .1 + .insert(ident.to_string()); + } + + pub fn contains_term(&self, ident: &syn::Ident) -> bool { + let ident = ident.to_string(); + self.scopes + .iter() + .rev() + .any(|scope| scope.0.contains(&ident)) + } + + pub fn contains_type(&self, ident: &syn::Ident) -> bool { + let ident = ident.to_string(); + self.scopes + .iter() + .rev() + .any(|scope| scope.1.contains(&ident)) + } +} + +#[derive(Default)] +pub struct FreeVariableVisitor { + pub free_variables: Vec, + pub current_scope: ScopeStack, +} + +impl<'ast> Visit<'ast> for FreeVariableVisitor { + fn visit_expr_closure(&mut self, i: &'ast syn::ExprClosure) { + self.current_scope.push(); + i.inputs.iter().for_each(|input| match input { + syn::Pat::Ident(pat_ident) => self.current_scope.insert_term(pat_ident.ident.clone()), + syn::Pat::Type(pat_type) => match pat_type.pat.as_ref() { + syn::Pat::Ident(pat_ident) => { + self.current_scope.insert_term(pat_ident.ident.clone()) + } + _ => panic!("Closure parameters must be identifiers"), + }, + _ => panic!("Closure parameters must be identifiers"), + }); + + syn::visit::visit_expr_closure(self, i); + + self.current_scope.pop(); + } + + fn visit_item_fn(&mut self, i: &'ast syn::ItemFn) { + self.current_scope.push(); + syn::visit::visit_item_fn(self, i); + self.current_scope.pop(); + } + + fn visit_generic_param(&mut self, i: &'ast syn::GenericParam) { + match i { + syn::GenericParam::Type(type_param) => { + self.current_scope.insert_type(type_param.ident.clone()); + } + syn::GenericParam::Lifetime(lifetime_param) => { + self.current_scope + .insert_type(lifetime_param.lifetime.ident.clone()); + } + syn::GenericParam::Const(const_param) => { + self.current_scope.insert_type(const_param.ident.clone()); + } + } + } + + fn visit_block(&mut self, i: &'ast syn::Block) { + self.current_scope.push(); + syn::visit::visit_block(self, i); + self.current_scope.pop(); + } + + fn visit_local(&mut self, i: &'ast syn::Local) { + i.init.iter().for_each(|init| { + syn::visit::visit_local_init(self, init); + }); + + match &i.pat { + syn::Pat::Ident(pat_ident) => { + self.current_scope.insert_term(pat_ident.ident.clone()); + } + _ => panic!("Local variables must be identifiers"), + } + } + + fn visit_ident(&mut self, i: &'ast proc_macro2::Ident) { + if !self.current_scope.contains_term(i) { + self.free_variables.push(i.clone()); + } + } + + fn visit_lifetime(&mut self, i: &'ast syn::Lifetime) { + if !self.current_scope.contains_type(&i.ident) { + self.free_variables.push(i.ident.clone()); + } + } + + fn visit_path(&mut self, i: &'ast syn::Path) { + if i.leading_colon.is_none() && !is_prelude(&i.segments.first().unwrap().ident) { + let node = i.segments.first().unwrap(); + if i.segments.len() == 1 && !self.current_scope.contains_term(&node.ident) { + self.free_variables.push(node.ident.clone()); + } + } + + for node in i.segments.iter() { + self.visit_path_arguments(&node.arguments); + } + } + + fn visit_arm(&mut self, i: &'ast syn::Arm) { + self.current_scope.push(); + syn::visit::visit_arm(self, i); + self.current_scope.pop(); + } + + fn visit_field_pat(&mut self, i: &'ast syn::FieldPat) { + for it in &i.attrs { + self.visit_attribute(it); + } + self.visit_pat(&i.pat); + } + + fn visit_pat_ident(&mut self, i: &'ast syn::PatIdent) { + self.current_scope.insert_term(i.ident.clone()); + } + + fn visit_expr_method_call(&mut self, i: &'ast syn::ExprMethodCall) { + syn::visit::visit_expr(self, &i.receiver); + } + + fn visit_type(&mut self, _: &'ast syn::Type) {} + + fn visit_expr_struct(&mut self, node: &'ast syn::ExprStruct) { + for it in &node.attrs { + self.visit_attribute(it); + } + if let Some(it) = &node.qself { + self.visit_qself(it); + } + self.visit_path(&node.path); + for el in Punctuated::pairs(&node.fields) { + let it = el.value(); + self.visit_expr(&it.expr); + } + if let Some(it) = &node.rest { + self.visit_expr(it); + } + } + + fn visit_expr_field(&mut self, i: &'ast syn::ExprField) { + self.visit_expr(&i.base); + } + + fn visit_macro(&mut self, i: &'ast syn::Macro) { + // TODO(shadaj): emit a warning if our guess at parsing fails + match i.delimiter { + MacroDelimiter::Paren(_binding_0) => i + .parse_body_with( + syn::punctuated::Punctuated::::parse_terminated, + ) + .ok() + .iter() + .flatten() + .for_each(|expr| { + self.visit_expr(expr); + }), + MacroDelimiter::Brace(_binding_0) => i + .parse_body_with(syn::Block::parse_within) + .ok() + .iter() + .flatten() + .for_each(|stmt| { + self.visit_stmt(stmt); + }), + MacroDelimiter::Bracket(_binding_0) => i + .parse_body_with( + syn::punctuated::Punctuated::::parse_terminated, + ) + .ok() + .iter() + .flatten() + .for_each(|expr| { + self.visit_expr(expr); + }), + } + } +} diff --git a/stagefright_macro/src/free_variable/prelude.rs b/stagefright_macro/src/free_variable/prelude.rs new file mode 100644 index 00000000000..0a16bb42f07 --- /dev/null +++ b/stagefright_macro/src/free_variable/prelude.rs @@ -0,0 +1,72 @@ +use std::collections::HashSet; + +use lazy_static::lazy_static; + +lazy_static! { + static ref PRELUDE: HashSet<&'static str> = { + vec![ + // https://doc.rust-lang.org/core/ + "bool", + "char", + "f32", + "f64", + "i8", + "i16", + "i32", + "i64", + "i128", + "isize", + "str", + "u8", + "u16", + "u32", + "u64", + "u128", + "usize", + // https://doc.rust-lang.org/std/prelude/index.html + "Copy", + "Send", + "Sized", + "Sync", + "Unpin", + "Drop", + "Fn", + "FnMut", + "FnOnce", + "drop", + "Box", + "ToOwned", + "Clone", + "PartialEq", + "PartialOrd", + "Eq", + "Ord", + "AsRef", + "AsMut", + "Into", + "From", + "Default", + "Iterator", + "Extend", + "IntoIterator", + "DoubleEndedIterator", + "ExactSizeIterator", + "Option", + "Some", + "None", + "Result", + "Ok", + "Err", + "String", + "ToString", + "Vec", + ] + .into_iter() + .collect::>() + }; +} + +pub fn is_prelude(ident: &syn::Ident) -> bool { + let ident_str = ident.to_string(); + PRELUDE.contains(&ident_str.as_str()) +} diff --git a/stagefright_macro/src/lib.rs b/stagefright_macro/src/lib.rs new file mode 100644 index 00000000000..6707697cbcc --- /dev/null +++ b/stagefright_macro/src/lib.rs @@ -0,0 +1,394 @@ +use proc_macro2::{Punct, Spacing, Span, TokenStream}; +use quote::{quote, quote_spanned, ToTokens}; +use syn::punctuated::Punctuated; +use syn::spanned::Spanned; +use syn::visit::Visit; +use syn::{AngleBracketedGenericArguments, PathArguments, Token, Type}; + +mod free_variable; +use free_variable::*; + +#[proc_macro] +pub fn q(input: proc_macro::TokenStream) -> proc_macro::TokenStream { + let stagefright_crate = proc_macro_crate::crate_name("stagefright") + .expect("stagefright should be present in `Cargo.toml`"); + let root = match stagefright_crate { + proc_macro_crate::FoundCrate::Itself => quote! { stagefright }, + proc_macro_crate::FoundCrate::Name(name) => { + let ident = syn::Ident::new(&name, Span::call_site()); + quote! { #ident } + } + }; + + let expr = syn::parse_macro_input!(input as syn::Expr); + let mut visitor = FreeVariableVisitor::default(); + visitor.visit_expr(&expr); + + let free_variables = visitor.free_variables.iter().map(|i| { + let ident = i.clone(); + let ident_str = ident.to_string(); + quote!((#ident_str.to_string(), ::#root::runtime_support::ToFreeVariableTokens::to_tokens(&#ident))) + }); + + let cloned_free_variables = visitor.free_variables.iter().map(|i| { + let mut i_without_span = i.clone(); + i_without_span.set_span(Span::call_site()); + quote!( + #[allow(non_upper_case_globals, non_snake_case)] + let #i_without_span = #i_without_span; + ) + }); + + let unitialized_free_variables = visitor.free_variables.iter().map(|i| { + let mut i_without_span = i.clone(); + i_without_span.set_span(Span::call_site()); + quote!( + #[allow(unused, non_upper_case_globals, non_snake_case)] + let #i = ::#root::runtime_support::FreeVariable::uninitialized(#i_without_span) + ) + }); + + let free_variables_vec = quote!(vec![#(#free_variables),*]); + + let expr_string = expr.clone().into_token_stream().to_string(); + proc_macro::TokenStream::from(quote!({ + #(#cloned_free_variables;)* + move |set_mod: &mut String, set_str: &mut String, set_vec: &mut Vec<(String, (Option<#root::internal::TokenStream>, Option<#root::internal::TokenStream>))>, run: bool| { + *set_mod = module_path!().to_string(); + *set_str = #expr_string.to_string(); + *set_vec = #free_variables_vec; + + if !run { + unsafe { + return ::std::mem::MaybeUninit::uninit().assume_init(); + } + } + + #[allow(unreachable_code)] + { + #(#unitialized_free_variables;)* + #expr + } + } + })) +} + +fn gen_use_paths( + root: TokenStream, + is_rooted: bool, + mut prefix: Vec, + tree: &syn::UseTree, + into: &mut Vec, +) { + match &tree { + syn::UseTree::Path(path) => { + prefix.push(path.ident.clone()); + gen_use_paths(root, is_rooted, prefix, &path.tree, into); + } + syn::UseTree::Group(group) => { + for tree in &group.items { + gen_use_paths(root.clone(), is_rooted, prefix.clone(), tree, into); + } + } + syn::UseTree::Name(name) => { + let name_ident = name.ident.clone(); + let mut name_ident_unspanned = name_ident.clone(); + name_ident_unspanned.set_span(Span::call_site()); + let prefix_unspanned = prefix + .iter() + .map(|i| { + let mut i = i.clone(); + i.set_span(Span::call_site()); + i + }) + .collect::>(); + + if is_rooted { + let full_path = quote!(#(#prefix::)*#name_ident).to_string(); + + into.push(quote! { + #[allow(non_upper_case_globals, non_snake_case)] + let #name_ident_unspanned = #root::runtime_support::create_import( + #full_path, + { + let __quse_local = (); + { + use ::#(#prefix_unspanned::)*#name_ident_unspanned as __quse_local; + __quse_local + } + } + ); + }); + } else if !prefix.is_empty() { + let first = prefix.first().unwrap(); + let prefix_suffix = prefix.iter().skip(1); + let suffix_full_path = quote!(#(#prefix_suffix::)*#name_ident).to_string(); + + into.push(quote! { + #[allow(non_upper_case_globals, non_snake_case)] + let #name_ident_unspanned = #first.extend( + #suffix_full_path, + { + let __quse_local = (); + { + use #(#prefix_unspanned::)*#name_ident_unspanned as __quse_local; + __quse_local + } + } + ); + }); + } else { + into.push(quote! { + #[allow(non_upper_case_globals, non_snake_case)] + let #name_ident = #root::runtime_support::Import::clone(&#name_ident); + }); + } + } + _ => todo!(), + } +} + +#[proc_macro] +pub fn quse_fn(input: proc_macro::TokenStream) -> proc_macro::TokenStream { + let stagefright_crate = proc_macro_crate::crate_name("stagefright") + .expect("stagefright should be present in `Cargo.toml`"); + let root = match stagefright_crate { + proc_macro_crate::FoundCrate::Itself => quote! { stagefright }, + proc_macro_crate::FoundCrate::Name(name) => { + let ident = syn::Ident::new(&name, Span::call_site()); + quote! { #ident } + } + }; + + let input_tokens = proc_macro2::TokenStream::from(input); + let import: syn::ItemUse = syn::parse_quote!(use #input_tokens;); + let mut all_paths_emitted = vec![]; + gen_use_paths( + root, + import.leading_colon.is_some(), + vec![], + &import.tree, + &mut all_paths_emitted, + ); + + quote! { + use #input_tokens; + #(#all_paths_emitted;)* + } + .into() +} + +#[proc_macro_attribute] +pub fn entry( + attr: proc_macro::TokenStream, + input: proc_macro::TokenStream, +) -> proc_macro::TokenStream { + let stagefright_crate = proc_macro_crate::crate_name("stagefright") + .expect("stagefright should be present in `Cargo.toml`"); + let root = match stagefright_crate { + proc_macro_crate::FoundCrate::Itself => quote! { stagefright }, + proc_macro_crate::FoundCrate::Name(name) => { + let ident = syn::Ident::new(&name, Span::call_site()); + quote! { #ident } + } + }; + + let attr_params = + syn::parse_macro_input!(attr with Punctuated::parse_terminated); + + let input = syn::parse_macro_input!(input as syn::ItemFn); + let input_name = &input.sig.ident; + + let input_generics = &input.sig.generics; + + let mut runtime_data_params = Vec::new(); + let mut runtime_data_args = Vec::new(); + + let param_parsing = input.sig.inputs.iter().skip(1).enumerate().flat_map(|(i, input)| { + match input { + syn::FnArg::Receiver(_) => panic!("Flow functions cannot take self"), + syn::FnArg::Typed(pat_type) => { + let runtime_tpe = match pat_type.ty.as_ref() { + Type::Path(path) => { + if path.path.segments.len() == 1 && path.path.segments[0].ident == "RuntimeData" { + match &path.path.segments[0].arguments { + PathArguments::AngleBracketed(AngleBracketedGenericArguments { + args, + .. + }) => Some(args[0].clone()), + _ => None, + } + } else { + None + } + } + _ => None, + }; + + let pat = pat_type.pat.clone(); + let ty = pat_type.ty.clone(); + + if let Some(runtime_tpe) = runtime_tpe { + let mut visitor = FreeVariableVisitor::default(); + + visitor.current_scope.insert_type(syn::Ident::new("RuntimeData", Span::call_site())); + + visitor.visit_generics(input_generics); + visitor.visit_generic_argument(&runtime_tpe); + + let mut out = vec![]; + + visitor.free_variables.iter().for_each(|i| { + let mut i_unspanned = i.clone(); + i_unspanned.set_span(Span::call_site()); + out.push(quote! { + if let Some(prelude) = ::#root::runtime_support::ToFreeVariableTokens::to_tokens(&#i_unspanned).0 { + runtime_data_prelude.push(prelude); + } + }); + }); + + runtime_data_params.push(quote! { + #pat: #runtime_tpe + }); + runtime_data_args.push(quote! { + ##pat + }); + + out.push(quote_spanned! {input.span()=> + let #pat: &#root::internal::syn::Expr = &input_parsed[#i]; + }); + + out + } else { + vec![quote_spanned! {input.span()=> + let #pat: #ty = #root::runtime_support::ParseFromLiteral::parse_from_literal(&input_parsed[#i]); + }] + } + } + } + }); + + let params_to_pass = input.sig.inputs.iter().skip(1).map(|input| match input { + syn::FnArg::Receiver(_) => panic!("Flow functions cannot take self"), + syn::FnArg::Typed(pat_type) => { + let is_runtime = match pat_type.ty.as_ref() { + Type::Path(path) => { + path.path.segments.len() == 1 && path.path.segments[0].ident == "RuntimeData" + } + _ => false, + }; + + if is_runtime { + let pat_ident = match pat_type.pat.as_ref() { + syn::Pat::Ident(pat_ident) => pat_ident, + _ => panic!("RuntimeData must be an identifier"), + }; + let pat_str = pat_ident.ident.to_string(); + quote!(#root::RuntimeData::new(#pat_str)) + } else { + let pat = pat_type.pat.clone(); + quote!(#pat) + } + } + }); + + let expected_arg_count = input.sig.inputs.len() - 1; + + let pound = Punct::new('#', Spacing::Alone); + let passed_generics = if attr_params.is_empty() { + quote!() + } else { + quote!(::<#attr_params>) + }; + + // the return type is always of form `impl Quoted`, this grabs `T` and any free variable imports for it + let (return_type_free, return_type_inner) = match &input.sig.output { + syn::ReturnType::Type(_, ty) => match ty.as_ref() { + Type::ImplTrait(impl_trait) => match impl_trait.bounds.first().unwrap() { + syn::TypeParamBound::Trait(quoted_path) => { + match "ed_path.path.segments[0].arguments { + syn::PathArguments::AngleBracketed(args) => { + match args.args.first().unwrap() { + syn::GenericArgument::Type(ty) => { + let mut visitor = FreeVariableVisitor::default(); + + visitor.visit_generics(input_generics); + visitor.visit_type(ty); + + let mut out = vec![]; + + visitor.free_variables.iter().for_each(|i| { + let mut i_unspanned = i.clone(); + i_unspanned.set_span(Span::call_site()); + out.push(quote! { + if let Some(prelude) = ::#root::runtime_support::ToFreeVariableTokens::to_tokens(&#i_unspanned).0 { + runtime_data_prelude.push(prelude); + } + }); + }); + + (out, ty.clone()) + } + _ => panic!(), + } + } + _ => panic!(), + } + } + _ => panic!(), + }, + _ => panic!(), + }, + _ => panic!(), + }; + + let orig_visibility = input.vis.clone(); + + proc_macro::TokenStream::from(quote_spanned! {input.span()=> + #orig_visibility fn #input_name(input: #root::internal::TokenStream) -> #root::internal::TokenStream { + #[allow(unused)] + let input_parsed = #root::internal::syn::parse::Parser::parse( + #root::internal::syn::punctuated::Punctuated::<#root::internal::syn::Expr, #root::internal::syn::Token![,]>::parse_terminated, + input.into() + ).unwrap(); + + if input_parsed.len() != #expected_arg_count { + panic!("Expected {} arguments, got {}", #expected_arg_count, input_parsed.len()); + } + + #[allow(unused_mut)] + let mut runtime_data_prelude = ::std::vec::Vec::<#root::internal::TokenStream>::new(); + + #(#param_parsing)* + #(#return_type_free)* + + #input + let dataflow_core = { + let graph = #root::QuotedContext::create(); + #root::Quoted::build(#input_name #passed_generics(&graph, #(#params_to_pass),*)) + }; + + let final_crate = #root::runtime_support::CURRENT_FINAL_CRATE.with(|f| *f.borrow()).unwrap(); + let final_crate_path: #root::internal::syn::Path = #root::internal::syn::parse_str(final_crate).unwrap(); + + let module_path: #root::internal::syn::Path = #root::internal::syn::parse_str(module_path!()).unwrap(); + let module_path = module_path.segments.iter().skip(1).collect::>(); + let module_path = #root::internal::syn::Path { + leading_colon: None, + segments: #root::internal::syn::punctuated::Punctuated::from_iter(module_path.into_iter().cloned()), + }; + + ::#root::internal::quote!({ + use #pound final_crate_path ::__flow:: #pound module_path *; + #pound (#pound runtime_data_prelude)* + fn create_flow #input_generics( + #(#runtime_data_params),* + ) -> #return_type_inner { + #pound dataflow_core + } + create_flow(#(#runtime_data_args),*) + }) + } + }) +} diff --git a/stagefright_tool/Cargo.toml b/stagefright_tool/Cargo.toml new file mode 100644 index 00000000000..e1aa0a01924 --- /dev/null +++ b/stagefright_tool/Cargo.toml @@ -0,0 +1,19 @@ +[package] +name = "stagefright_tool" +publish = true +version = "0.4.0" +edition = "2021" +license = "Apache-2.0" +documentation = "https://docs.rs/stagefright_macro/" +description = "Helper macros for the stagefright crate" + +[lib] +path = "src/lib.rs" + +[dependencies] +syn-inline-mod = "0.6.0" +quote = "1.0.0" +syn = { version = "2.0.0", features = [ "parsing", "extra-traits", "visit" ] } +proc-macro2 = "1.0.57" +proc-macro-crate = "1.1.0" +lazy_static = "1.4.0" diff --git a/stagefright_tool/src/lib.rs b/stagefright_tool/src/lib.rs new file mode 100644 index 00000000000..e41a393b74b --- /dev/null +++ b/stagefright_tool/src/lib.rs @@ -0,0 +1,219 @@ +use std::path::Path; +use std::{env, fs}; + +use proc_macro2::Span; +use quote::ToTokens; +use syn::parse_quote; +use syn::visit_mut::VisitMut; + +struct GenMacroVistor { + exported_macros: Vec, + current_mod: syn::Path, +} + +// marks everything as pub(crate) because proc-macros cannot actually export anything +impl VisitMut for GenMacroVistor { + fn visit_item_enum_mut(&mut self, i: &mut syn::ItemEnum) { + if matches!(i.vis, syn::Visibility::Public(_)) { + i.vis = parse_quote!(pub(crate)); + } + } + + fn visit_item_mod_mut(&mut self, i: &mut syn::ItemMod) { + let old_mod = self.current_mod.clone(); + self.current_mod = parse_quote!(#old_mod::#i.ident); + + syn::visit_mut::visit_item_mod_mut(self, i); + + self.current_mod = old_mod; + } + + fn visit_item_fn_mut(&mut self, i: &mut syn::ItemFn) { + let is_entry = i + .attrs + .iter() + .any(|a| a.path().to_token_stream().to_string() == "stagefright :: entry"); + + if is_entry { + let cur_path = &self.current_mod; + let i_name = &i.sig.ident; + self.exported_macros.push(parse_quote!(#cur_path::#i_name)); + } + + if matches!(i.vis, syn::Visibility::Public(_)) { + i.vis = parse_quote!(pub(crate)); + } + } + + fn visit_item_mut(&mut self, i: &mut syn::Item) { + syn::visit_mut::visit_item_mut(self, i); + } +} + +pub fn gen_macro(flow_path: &Path, final_crate: &str) { + let out_dir = env::var_os("OUT_DIR").unwrap(); + let dest_path = Path::new(&out_dir).join("lib.rs"); + + let mut flow_lib = + syn_inline_mod::parse_and_inline_modules(&flow_path.join("src").join("lib.rs")); + let mut visitor = GenMacroVistor { + exported_macros: vec![], + current_mod: parse_quote!(crate::__flow), + }; + visitor.visit_file_mut(&mut flow_lib); + + for exported in visitor.exported_macros { + let underscored_path = syn::Ident::new( + &exported + .segments + .iter() + .map(|s| s.ident.to_string()) + .collect::>() + .join("_"), + Span::call_site(), + ); + + let proc_macro_wrapper: syn::ItemFn = parse_quote!( + #[proc_macro] + #[allow(non_snake_case)] + pub fn #underscored_path(input: ::proc_macro::TokenStream) -> ::proc_macro::TokenStream { + let input = ::stagefright::internal::TokenStream::from(input); + let out = ::stagefright::runtime_support::CURRENT_FINAL_CRATE.with(|f| { + let mut f = f.borrow_mut(); + *f = Some(#final_crate); + drop(f); + #exported(input) + }); + ::proc_macro::TokenStream::from(out) + } + ); + + flow_lib.items.push(syn::Item::Fn(proc_macro_wrapper)); + } + + fs::write(&dest_path, flow_lib.to_token_stream().to_string()).unwrap(); + println!("cargo:rerun-if-changed=build.rs"); + + let flow_path_absolute = fs::canonicalize(flow_path).unwrap(); + println!( + "cargo:rerun-if-changed={}", + flow_path_absolute.to_string_lossy() + ); +} + +struct GenFinalVistor { + current_mod: syn::Path, +} + +impl VisitMut for GenFinalVistor { + fn visit_item_mod_mut(&mut self, i: &mut syn::ItemMod) { + let old_mod = self.current_mod.clone(); + self.current_mod = parse_quote!(#old_mod::#i.ident); + + syn::visit_mut::visit_item_mod_mut(self, i); + + self.current_mod = old_mod; + } + + fn visit_item_mut(&mut self, i: &mut syn::Item) { + match i { + syn::Item::Fn(fun) => { + let is_entry = fun + .attrs + .iter() + .any(|a| a.path().to_token_stream().to_string() == "stagefright :: entry"); + + if is_entry { + let cur_path = &self.current_mod; + let i_name = &fun.sig.ident; + let path: syn::Path = parse_quote!(#cur_path::#i_name); + + let underscored_path = syn::Ident::new( + &path + .segments + .iter() + .map(|s| s.ident.to_string()) + .collect::>() + .join("_"), + Span::call_site(), + ); + + *i = parse_quote!(pub use crate::__macro::#underscored_path as #i_name;); + } + } + syn::Item::Enum(e) => { + if matches!(e.vis, syn::Visibility::Public(_)) { + let cur_path = &self.current_mod; + let e_name = &e.ident; + *i = parse_quote!(pub use #cur_path::#e_name;); + } + } + _ => {} + } + + syn::visit_mut::visit_item_mut(self, i); + } +} + +struct GenFinalPubVistor { + current_mod: syn::Path, +} + +impl VisitMut for GenFinalPubVistor { + fn visit_item_enum_mut(&mut self, i: &mut syn::ItemEnum) { + i.vis = parse_quote!(pub); + } + + fn visit_item_use_mut(&mut self, i: &mut syn::ItemUse) { + i.vis = parse_quote!(pub); + } + + fn visit_item_mod_mut(&mut self, i: &mut syn::ItemMod) { + let old_mod = self.current_mod.clone(); + self.current_mod = parse_quote!(#old_mod::#i.ident); + + i.vis = parse_quote!(pub); + + syn::visit_mut::visit_item_mod_mut(self, i); + + self.current_mod = old_mod; + } +} + +pub fn gen_final(flow_path: &Path) { + let out_dir = env::var_os("OUT_DIR").unwrap(); + + let mut flow_lib = + syn_inline_mod::parse_and_inline_modules(&flow_path.join("src").join("lib.rs")); + let mut flow_lib_pub = flow_lib.clone(); + + let mut final_visitor = GenFinalVistor { + current_mod: parse_quote!(crate::__flow), + }; + final_visitor.visit_file_mut(&mut flow_lib); + + let mut final_pub_visitor = GenFinalPubVistor { + current_mod: parse_quote!(crate::__flow), + }; + final_pub_visitor.visit_file_mut(&mut flow_lib_pub); + + fs::write( + Path::new(&out_dir).join("lib.rs"), + flow_lib.to_token_stream().to_string(), + ) + .unwrap(); + + fs::write( + Path::new(&out_dir).join("lib_pub.rs"), + flow_lib_pub.to_token_stream().to_string(), + ) + .unwrap(); + + println!("cargo:rerun-if-changed=build.rs"); + + let flow_path_absolute = fs::canonicalize(flow_path).unwrap(); + println!( + "cargo:rerun-if-changed={}", + flow_path_absolute.to_string_lossy() + ); +} diff --git a/website_playground/src/lib.rs b/website_playground/src/lib.rs index bbf61f43e9d..d83c4ca311a 100644 --- a/website_playground/src/lib.rs +++ b/website_playground/src/lib.rs @@ -197,13 +197,13 @@ pub fn compile_datalog(program: String) -> JsValue { serde_wasm_bindgen::to_value(&out).unwrap() } -struct HydroflowInstance { - hydroflow: Hydroflow, +struct HydroflowInstance<'a, In, Out> { + hydroflow: Hydroflow<'a>, input: tokio::sync::mpsc::UnboundedSender, output: tokio::sync::mpsc::UnboundedReceiver, } -type DatalogBooleanDemoInstance = HydroflowInstance<(i32,), (i32,)>; +type DatalogBooleanDemoInstance = HydroflowInstance<'static, (i32,), (i32,)>; thread_local! { static DATALOG_BOOLEAN_DEMO_INSTANCES: RefCell> =