Skip to content

Commit

Permalink
feat(hydroflow_plus, stageleft)!: allow cluster self ID to be referen…
Browse files Browse the repository at this point in the history
…ced as a global constant (#1574)

This eliminates the need to store `cluster.self_id()` in a local
variable first, instead you can directly reference `CLUSTER_SELF_ID`.
  • Loading branch information
shadaj authored Nov 21, 2024
1 parent 4c5ca31 commit a93a5e5
Show file tree
Hide file tree
Showing 86 changed files with 1,167 additions and 805 deletions.
25 changes: 16 additions & 9 deletions hydroflow_plus/src/builder/compiled.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,8 @@ use hydroflow::scheduled::graph::Hydroflow;
use hydroflow_lang::graph::{partition_graph, HydroflowGraph};
use proc_macro2::TokenStream;
use quote::quote;
use stageleft::runtime_support::FreeVariable;
use stageleft::Quoted;
use stageleft::runtime_support::FreeVariableWithContext;
use stageleft::QuotedWithContext;

use crate::staging_util::Invariant;

Expand All @@ -27,7 +27,10 @@ impl<ID> CompiledFlow<'_, ID> {
}

impl<'a> CompiledFlow<'a, usize> {
pub fn with_dynamic_id(self, id: impl Quoted<'a, usize>) -> CompiledFlowWithId<'a> {
pub fn with_dynamic_id(
self,
id: impl QuotedWithContext<'a, usize, ()>,
) -> CompiledFlowWithId<'a> {
let hydroflow_crate = proc_macro_crate::crate_name("hydroflow_plus")
.expect("hydroflow_plus should be present in `Cargo.toml`");
let root = match hydroflow_crate {
Expand Down Expand Up @@ -82,10 +85,12 @@ impl<'a> CompiledFlow<'a, usize> {
}
}

impl<'a> Quoted<'a, Hydroflow<'a>> for CompiledFlow<'a, ()> {}
impl<'a, Ctx> QuotedWithContext<'a, Hydroflow<'a>, Ctx> for CompiledFlow<'a, ()> {}

impl<'a> FreeVariable<Hydroflow<'a>> for CompiledFlow<'a, ()> {
fn to_tokens(mut self) -> (Option<TokenStream>, Option<TokenStream>) {
impl<'a, Ctx> FreeVariableWithContext<Ctx> for CompiledFlow<'a, ()> {
type O = Hydroflow<'a>;

fn to_tokens(mut self, _ctx: &Ctx) -> (Option<TokenStream>, Option<TokenStream>) {
let hydroflow_crate = proc_macro_crate::crate_name("hydroflow_plus")
.expect("hydroflow_plus should be present in `Cargo.toml`");
let root = match hydroflow_crate {
Expand Down Expand Up @@ -116,10 +121,12 @@ pub struct CompiledFlowWithId<'a> {
_phantom: Invariant<'a>,
}

impl<'a> Quoted<'a, Hydroflow<'a>> for CompiledFlowWithId<'a> {}
impl<'a, Ctx> QuotedWithContext<'a, Hydroflow<'a>, Ctx> for CompiledFlowWithId<'a> {}

impl<'a, Ctx> FreeVariableWithContext<Ctx> for CompiledFlowWithId<'a> {
type O = Hydroflow<'a>;

impl<'a> FreeVariable<Hydroflow<'a>> for CompiledFlowWithId<'a> {
fn to_tokens(self) -> (Option<TokenStream>, Option<TokenStream>) {
fn to_tokens(self, _ctx: &Ctx) -> (Option<TokenStream>, Option<TokenStream>) {
(None, Some(self.tokens))
}
}
2 changes: 1 addition & 1 deletion hydroflow_plus/src/builder/deploy.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ use hydroflow::futures::{Sink, Stream};
use proc_macro2::Span;
use serde::de::DeserializeOwned;
use serde::Serialize;
use stageleft::Quoted;
use stageleft::QuotedWithContext;

use super::built::build_inner;
use super::compiled::CompiledFlow;
Expand Down
124 changes: 4 additions & 120 deletions hydroflow_plus/src/deploy/deploy_graph.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,14 +20,11 @@ use hydroflow::util::deploy::{ConnectedSink, ConnectedSource};
use nameof::name_of;
use serde::de::DeserializeOwned;
use serde::Serialize;
use sha2::{Digest, Sha256};
use stageleft::{Quoted, RuntimeData};
use syn::visit_mut::VisitMut;
use stageleft::{QuotedWithContext, RuntimeData};
use tokio::sync::RwLock;
use trybuild_internals_api::path;

use super::deploy_runtime::*;
use super::trybuild::{compile_graph_trybuild, create_trybuild};
use super::trybuild::create_graph_trybuild;
use super::{ClusterSpec, Deploy, ExternalSpec, IntoProcessSpec, Node, ProcessSpec, RegisterPort};

pub struct HydroDeploy {}
Expand Down Expand Up @@ -373,14 +370,14 @@ impl<'a> Deploy<'a> for HydroDeploy {
fn cluster_ids(
_env: &Self::CompileEnv,
of_cluster: usize,
) -> impl Quoted<'a, &'a Vec<u32>> + Copy + 'a {
) -> impl QuotedWithContext<'a, &'a Vec<u32>, ()> + Copy + 'a {
cluster_members(
RuntimeData::new("__hydroflow_plus_trybuild_cli"),
of_cluster,
)
}

fn cluster_self_id(_env: &Self::CompileEnv) -> impl Quoted<'a, u32> + Copy + 'a {
fn cluster_self_id(_env: &Self::CompileEnv) -> impl QuotedWithContext<'a, u32, ()> + Copy + 'a {
cluster_self_id(RuntimeData::new("__hydroflow_plus_trybuild_cli"))
}
}
Expand Down Expand Up @@ -872,119 +869,6 @@ impl<T: Into<TrybuildHost>, I: IntoIterator<Item = T>> ClusterSpec<'_, HydroDepl
}
}

fn clean_name_hint(name_hint: &str) -> String {
name_hint
.replace("::", "__")
.replace(" ", "_")
.replace(",", "_")
.replace("<", "_")
.replace(">", "")
.replace("(", "")
.replace(")", "")
}

// TODO(shadaj): has to be public due to stageleft limitations
#[doc(hidden)]
pub struct ReplaceCrateNameWithStaged {
pub crate_name: String,
}

impl VisitMut for ReplaceCrateNameWithStaged {
fn visit_type_path_mut(&mut self, i: &mut syn::TypePath) {
if let Some(first) = i.path.segments.first() {
if first.ident == self.crate_name {
let tail = i.path.segments.iter().skip(1).collect::<Vec<_>>();
*i = syn::parse_quote!(crate::__staged #(::#tail)*);
}
}

syn::visit_mut::visit_type_path_mut(self, i);
}
}

// TODO(shadaj): has to be public due to stageleft limitations
#[doc(hidden)]
pub struct ReplaceCrateWithOrig {
pub crate_name: String,
}

impl VisitMut for ReplaceCrateWithOrig {
fn visit_item_use_mut(&mut self, i: &mut syn::ItemUse) {
if let syn::UseTree::Path(p) = &mut i.tree {
if p.ident == "crate" {
p.ident = syn::Ident::new(&self.crate_name, p.ident.span());
i.leading_colon = Some(Default::default());
}
}

syn::visit_mut::visit_item_use_mut(self, i);
}
}

fn create_graph_trybuild(
graph: HydroflowGraph,
extra_stmts: Vec<syn::Stmt>,
name_hint: &Option<String>,
) -> (
String,
(std::path::PathBuf, std::path::PathBuf, Option<Vec<String>>),
) {
let source_dir = trybuild_internals_api::cargo::manifest_dir().unwrap();
let source_manifest = trybuild_internals_api::dependencies::get_manifest(&source_dir).unwrap();
let crate_name = &source_manifest.package.name.to_string().replace("-", "_");

let is_test = super::trybuild::IS_TEST.load(std::sync::atomic::Ordering::Relaxed);

let mut generated_code = compile_graph_trybuild(graph, extra_stmts);

ReplaceCrateNameWithStaged {
crate_name: crate_name.clone(),
}
.visit_file_mut(&mut generated_code);

let mut inlined_staged = stageleft_tool::gen_staged_trybuild(
&path!(source_dir / "src" / "lib.rs"),
crate_name.clone(),
is_test,
);

ReplaceCrateWithOrig {
crate_name: crate_name.clone(),
}
.visit_file_mut(&mut inlined_staged);

let source = prettyplease::unparse(&syn::parse_quote! {
#generated_code

#[allow(
unused,
ambiguous_glob_reexports,
clippy::suspicious_else_formatting,
unexpected_cfgs,
reason = "generated code"
)]
pub mod __staged {
#inlined_staged
}
});

let mut hasher = Sha256::new();
hasher.update(&source);
let hash = format!("{:X}", hasher.finalize())
.chars()
.take(8)
.collect::<String>();

let bin_name = if let Some(name_hint) = &name_hint {
format!("{}_{}", clean_name_hint(name_hint), &hash)
} else {
hash
};

let trybuild_created = create_trybuild(&source, &bin_name, is_test).unwrap();
(bin_name, trybuild_created)
}

fn create_trybuild_service(
trybuild: TrybuildHost,
dir: &std::path::PathBuf,
Expand Down
26 changes: 13 additions & 13 deletions hydroflow_plus/src/deploy/deploy_runtime.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ use hydroflow::util::deploy::{
ConnectedDemux, ConnectedDirect, ConnectedSink, ConnectedSource, ConnectedTagged, DeployPorts,
};
use serde::{Deserialize, Serialize};
use stageleft::{q, Quoted, RuntimeData};
use stageleft::{q, QuotedWithContext, RuntimeData};

#[derive(Default, Serialize, Deserialize)]
pub struct HydroflowPlusMeta {
Expand All @@ -16,13 +16,13 @@ pub struct HydroflowPlusMeta {
pub fn cluster_members(
cli: RuntimeData<&DeployPorts<HydroflowPlusMeta>>,
of_cluster: usize,
) -> impl Quoted<&Vec<u32>> + Copy {
) -> impl QuotedWithContext<&Vec<u32>, ()> + Copy {
q!(cli.meta.clusters.get(&of_cluster).unwrap())
}

pub fn cluster_self_id(
cli: RuntimeData<&DeployPorts<HydroflowPlusMeta>>,
) -> impl Quoted<u32> + Copy {
) -> impl QuotedWithContext<u32, ()> + Copy {
q!(cli
.meta
.cluster_id
Expand All @@ -41,15 +41,15 @@ pub fn deploy_o2o(
.connect_local_blocking::<ConnectedDirect>()
.into_sink()
})
.splice_untyped()
.splice_untyped_ctx(&())
},
{
q!({
env.port(p2_port)
.connect_local_blocking::<ConnectedDirect>()
.into_source()
})
.splice_untyped()
.splice_untyped_ctx(&())
},
)
}
Expand All @@ -66,15 +66,15 @@ pub fn deploy_o2m(
.connect_local_blocking::<ConnectedDemux<ConnectedDirect>>()
.into_sink()
})
.splice_untyped()
.splice_untyped_ctx(&())
},
{
q!({
env.port(c2_port)
.connect_local_blocking::<ConnectedDirect>()
.into_source()
})
.splice_untyped()
.splice_untyped_ctx(&())
},
)
}
Expand All @@ -91,15 +91,15 @@ pub fn deploy_m2o(
.connect_local_blocking::<ConnectedDirect>()
.into_sink()
})
.splice_untyped()
.splice_untyped_ctx(&())
},
{
q!({
env.port(p2_port)
.connect_local_blocking::<ConnectedTagged<ConnectedDirect>>()
.into_source()
})
.splice_untyped()
.splice_untyped_ctx(&())
},
)
}
Expand All @@ -116,15 +116,15 @@ pub fn deploy_m2m(
.connect_local_blocking::<ConnectedDemux<ConnectedDirect>>()
.into_sink()
})
.splice_untyped()
.splice_untyped_ctx(&())
},
{
q!({
env.port(c2_port)
.connect_local_blocking::<ConnectedTagged<ConnectedDirect>>()
.into_source()
})
.splice_untyped()
.splice_untyped_ctx(&())
},
)
}
Expand All @@ -139,7 +139,7 @@ pub fn deploy_e2o(
.connect_local_blocking::<ConnectedDirect>()
.into_source()
})
.splice_untyped()
.splice_untyped_ctx(&())
}

pub fn deploy_o2e(
Expand All @@ -152,5 +152,5 @@ pub fn deploy_o2e(
.connect_local_blocking::<ConnectedDirect>()
.into_sink()
})
.splice_untyped()
.splice_untyped_ctx(&())
}
6 changes: 3 additions & 3 deletions hydroflow_plus/src/deploy/macro_runtime.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ use hydroflow::bytes::Bytes;
use hydroflow::futures::{Sink, Stream};
use hydroflow::util::deploy::DeployPorts;
use hydroflow_lang::graph::HydroflowGraph;
use stageleft::{Quoted, RuntimeData};
use stageleft::{QuotedWithContext, RuntimeData};

use super::HydroflowPlusMeta;
use crate::deploy::{ClusterSpec, Deploy, ExternalSpec, Node, ProcessSpec, RegisterPort};
Expand Down Expand Up @@ -170,11 +170,11 @@ impl<'a> Deploy<'a> for DeployRuntime {
fn cluster_ids(
env: &Self::CompileEnv,
of_cluster: usize,
) -> impl Quoted<'a, &'a Vec<u32>> + Copy + 'a {
) -> impl QuotedWithContext<'a, &'a Vec<u32>, ()> + Copy + 'a {
super::deploy_runtime::cluster_members(*env, of_cluster)
}

fn cluster_self_id(env: &Self::CompileEnv) -> impl Quoted<'a, u32> + Copy + 'a {
fn cluster_self_id(env: &Self::CompileEnv) -> impl QuotedWithContext<'a, u32, ()> + Copy + 'a {
super::deploy_runtime::cluster_self_id(*env)
}
}
Expand Down
11 changes: 8 additions & 3 deletions hydroflow_plus/src/deploy/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,14 +7,19 @@ use hydroflow::futures::{Sink, Stream};
use hydroflow_lang::graph::HydroflowGraph;
use serde::de::DeserializeOwned;
use serde::Serialize;
use stageleft::Quoted;
use stageleft::QuotedWithContext;

#[cfg(feature = "deploy_runtime")]
pub mod macro_runtime;

#[cfg(feature = "deploy")]
pub(crate) mod trybuild;

// TODO(shadaj): has to be public due to stageleft limitations
#[cfg(feature = "deploy")]
#[doc(hidden)]
pub mod trybuild_rewriters;

pub use macro_runtime::*;
#[cfg(feature = "deploy")]
pub use trybuild::init_test;
Expand Down Expand Up @@ -171,8 +176,8 @@ pub trait Deploy<'a> {
fn cluster_ids(
env: &Self::CompileEnv,
of_cluster: usize,
) -> impl Quoted<'a, &'a Vec<u32>> + Copy + 'a;
fn cluster_self_id(env: &Self::CompileEnv) -> impl Quoted<'a, u32> + Copy + 'a;
) -> impl QuotedWithContext<'a, &'a Vec<u32>, ()> + Copy + 'a;
fn cluster_self_id(env: &Self::CompileEnv) -> impl QuotedWithContext<'a, u32, ()> + Copy + 'a;
}

impl<
Expand Down
Loading

0 comments on commit a93a5e5

Please sign in to comment.