Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(hydroflow_plus, stageleft)!: allow cluster self ID to be referenced as a global constant #1574

Merged
merged 1 commit into from
Nov 21, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
25 changes: 16 additions & 9 deletions hydroflow_plus/src/builder/compiled.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,8 @@ use hydroflow::scheduled::graph::Hydroflow;
use hydroflow_lang::graph::{partition_graph, HydroflowGraph};
use proc_macro2::TokenStream;
use quote::quote;
use stageleft::runtime_support::FreeVariable;
use stageleft::Quoted;
use stageleft::runtime_support::FreeVariableWithContext;
use stageleft::QuotedWithContext;

use crate::staging_util::Invariant;

Expand All @@ -27,7 +27,10 @@ impl<ID> CompiledFlow<'_, ID> {
}

impl<'a> CompiledFlow<'a, usize> {
pub fn with_dynamic_id(self, id: impl Quoted<'a, usize>) -> CompiledFlowWithId<'a> {
pub fn with_dynamic_id(
self,
id: impl QuotedWithContext<'a, usize, ()>,
) -> CompiledFlowWithId<'a> {
let hydroflow_crate = proc_macro_crate::crate_name("hydroflow_plus")
.expect("hydroflow_plus should be present in `Cargo.toml`");
let root = match hydroflow_crate {
Expand Down Expand Up @@ -82,10 +85,12 @@ impl<'a> CompiledFlow<'a, usize> {
}
}

impl<'a> Quoted<'a, Hydroflow<'a>> for CompiledFlow<'a, ()> {}
impl<'a, Ctx> QuotedWithContext<'a, Hydroflow<'a>, Ctx> for CompiledFlow<'a, ()> {}

impl<'a> FreeVariable<Hydroflow<'a>> for CompiledFlow<'a, ()> {
fn to_tokens(mut self) -> (Option<TokenStream>, Option<TokenStream>) {
impl<'a, Ctx> FreeVariableWithContext<Ctx> for CompiledFlow<'a, ()> {
type O = Hydroflow<'a>;

fn to_tokens(mut self, _ctx: &Ctx) -> (Option<TokenStream>, Option<TokenStream>) {
let hydroflow_crate = proc_macro_crate::crate_name("hydroflow_plus")
.expect("hydroflow_plus should be present in `Cargo.toml`");
let root = match hydroflow_crate {
Expand Down Expand Up @@ -116,10 +121,12 @@ pub struct CompiledFlowWithId<'a> {
_phantom: Invariant<'a>,
}

impl<'a> Quoted<'a, Hydroflow<'a>> for CompiledFlowWithId<'a> {}
impl<'a, Ctx> QuotedWithContext<'a, Hydroflow<'a>, Ctx> for CompiledFlowWithId<'a> {}

impl<'a, Ctx> FreeVariableWithContext<Ctx> for CompiledFlowWithId<'a> {
type O = Hydroflow<'a>;

impl<'a> FreeVariable<Hydroflow<'a>> for CompiledFlowWithId<'a> {
fn to_tokens(self) -> (Option<TokenStream>, Option<TokenStream>) {
fn to_tokens(self, _ctx: &Ctx) -> (Option<TokenStream>, Option<TokenStream>) {
(None, Some(self.tokens))
}
}
2 changes: 1 addition & 1 deletion hydroflow_plus/src/builder/deploy.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ use hydroflow::futures::{Sink, Stream};
use proc_macro2::Span;
use serde::de::DeserializeOwned;
use serde::Serialize;
use stageleft::Quoted;
use stageleft::QuotedWithContext;

use super::built::build_inner;
use super::compiled::CompiledFlow;
Expand Down
124 changes: 4 additions & 120 deletions hydroflow_plus/src/deploy/deploy_graph.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,14 +20,11 @@ use hydroflow::util::deploy::{ConnectedSink, ConnectedSource};
use nameof::name_of;
use serde::de::DeserializeOwned;
use serde::Serialize;
use sha2::{Digest, Sha256};
use stageleft::{Quoted, RuntimeData};
use syn::visit_mut::VisitMut;
use stageleft::{QuotedWithContext, RuntimeData};
use tokio::sync::RwLock;
use trybuild_internals_api::path;

use super::deploy_runtime::*;
use super::trybuild::{compile_graph_trybuild, create_trybuild};
use super::trybuild::create_graph_trybuild;
use super::{ClusterSpec, Deploy, ExternalSpec, IntoProcessSpec, Node, ProcessSpec, RegisterPort};

pub struct HydroDeploy {}
Expand Down Expand Up @@ -373,14 +370,14 @@ impl<'a> Deploy<'a> for HydroDeploy {
fn cluster_ids(
_env: &Self::CompileEnv,
of_cluster: usize,
) -> impl Quoted<'a, &'a Vec<u32>> + Copy + 'a {
) -> impl QuotedWithContext<'a, &'a Vec<u32>, ()> + Copy + 'a {
cluster_members(
RuntimeData::new("__hydroflow_plus_trybuild_cli"),
of_cluster,
)
}

fn cluster_self_id(_env: &Self::CompileEnv) -> impl Quoted<'a, u32> + Copy + 'a {
fn cluster_self_id(_env: &Self::CompileEnv) -> impl QuotedWithContext<'a, u32, ()> + Copy + 'a {
cluster_self_id(RuntimeData::new("__hydroflow_plus_trybuild_cli"))
}
}
Expand Down Expand Up @@ -872,119 +869,6 @@ impl<T: Into<TrybuildHost>, I: IntoIterator<Item = T>> ClusterSpec<'_, HydroDepl
}
}

fn clean_name_hint(name_hint: &str) -> String {
name_hint
.replace("::", "__")
.replace(" ", "_")
.replace(",", "_")
.replace("<", "_")
.replace(">", "")
.replace("(", "")
.replace(")", "")
}

// TODO(shadaj): has to be public due to stageleft limitations
#[doc(hidden)]
pub struct ReplaceCrateNameWithStaged {
pub crate_name: String,
}

impl VisitMut for ReplaceCrateNameWithStaged {
fn visit_type_path_mut(&mut self, i: &mut syn::TypePath) {
if let Some(first) = i.path.segments.first() {
if first.ident == self.crate_name {
let tail = i.path.segments.iter().skip(1).collect::<Vec<_>>();
*i = syn::parse_quote!(crate::__staged #(::#tail)*);
}
}

syn::visit_mut::visit_type_path_mut(self, i);
}
}

// TODO(shadaj): has to be public due to stageleft limitations
#[doc(hidden)]
pub struct ReplaceCrateWithOrig {
pub crate_name: String,
}

impl VisitMut for ReplaceCrateWithOrig {
fn visit_item_use_mut(&mut self, i: &mut syn::ItemUse) {
if let syn::UseTree::Path(p) = &mut i.tree {
if p.ident == "crate" {
p.ident = syn::Ident::new(&self.crate_name, p.ident.span());
i.leading_colon = Some(Default::default());
}
}

syn::visit_mut::visit_item_use_mut(self, i);
}
}

fn create_graph_trybuild(
graph: HydroflowGraph,
extra_stmts: Vec<syn::Stmt>,
name_hint: &Option<String>,
) -> (
String,
(std::path::PathBuf, std::path::PathBuf, Option<Vec<String>>),
) {
let source_dir = trybuild_internals_api::cargo::manifest_dir().unwrap();
let source_manifest = trybuild_internals_api::dependencies::get_manifest(&source_dir).unwrap();
let crate_name = &source_manifest.package.name.to_string().replace("-", "_");

let is_test = super::trybuild::IS_TEST.load(std::sync::atomic::Ordering::Relaxed);

let mut generated_code = compile_graph_trybuild(graph, extra_stmts);

ReplaceCrateNameWithStaged {
crate_name: crate_name.clone(),
}
.visit_file_mut(&mut generated_code);

let mut inlined_staged = stageleft_tool::gen_staged_trybuild(
&path!(source_dir / "src" / "lib.rs"),
crate_name.clone(),
is_test,
);

ReplaceCrateWithOrig {
crate_name: crate_name.clone(),
}
.visit_file_mut(&mut inlined_staged);

let source = prettyplease::unparse(&syn::parse_quote! {
#generated_code

#[allow(
unused,
ambiguous_glob_reexports,
clippy::suspicious_else_formatting,
unexpected_cfgs,
reason = "generated code"
)]
pub mod __staged {
#inlined_staged
}
});

let mut hasher = Sha256::new();
hasher.update(&source);
let hash = format!("{:X}", hasher.finalize())
.chars()
.take(8)
.collect::<String>();

let bin_name = if let Some(name_hint) = &name_hint {
format!("{}_{}", clean_name_hint(name_hint), &hash)
} else {
hash
};

let trybuild_created = create_trybuild(&source, &bin_name, is_test).unwrap();
(bin_name, trybuild_created)
}

fn create_trybuild_service(
trybuild: TrybuildHost,
dir: &std::path::PathBuf,
Expand Down
26 changes: 13 additions & 13 deletions hydroflow_plus/src/deploy/deploy_runtime.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ use hydroflow::util::deploy::{
ConnectedDemux, ConnectedDirect, ConnectedSink, ConnectedSource, ConnectedTagged, DeployPorts,
};
use serde::{Deserialize, Serialize};
use stageleft::{q, Quoted, RuntimeData};
use stageleft::{q, QuotedWithContext, RuntimeData};

#[derive(Default, Serialize, Deserialize)]
pub struct HydroflowPlusMeta {
Expand All @@ -16,13 +16,13 @@ pub struct HydroflowPlusMeta {
pub fn cluster_members(
cli: RuntimeData<&DeployPorts<HydroflowPlusMeta>>,
of_cluster: usize,
) -> impl Quoted<&Vec<u32>> + Copy {
) -> impl QuotedWithContext<&Vec<u32>, ()> + Copy {
q!(cli.meta.clusters.get(&of_cluster).unwrap())
}

pub fn cluster_self_id(
cli: RuntimeData<&DeployPorts<HydroflowPlusMeta>>,
) -> impl Quoted<u32> + Copy {
) -> impl QuotedWithContext<u32, ()> + Copy {
q!(cli
.meta
.cluster_id
Expand All @@ -41,15 +41,15 @@ pub fn deploy_o2o(
.connect_local_blocking::<ConnectedDirect>()
.into_sink()
})
.splice_untyped()
.splice_untyped_ctx(&())
},
{
q!({
env.port(p2_port)
.connect_local_blocking::<ConnectedDirect>()
.into_source()
})
.splice_untyped()
.splice_untyped_ctx(&())
},
)
}
Expand All @@ -66,15 +66,15 @@ pub fn deploy_o2m(
.connect_local_blocking::<ConnectedDemux<ConnectedDirect>>()
.into_sink()
})
.splice_untyped()
.splice_untyped_ctx(&())
},
{
q!({
env.port(c2_port)
.connect_local_blocking::<ConnectedDirect>()
.into_source()
})
.splice_untyped()
.splice_untyped_ctx(&())
},
)
}
Expand All @@ -91,15 +91,15 @@ pub fn deploy_m2o(
.connect_local_blocking::<ConnectedDirect>()
.into_sink()
})
.splice_untyped()
.splice_untyped_ctx(&())
},
{
q!({
env.port(p2_port)
.connect_local_blocking::<ConnectedTagged<ConnectedDirect>>()
.into_source()
})
.splice_untyped()
.splice_untyped_ctx(&())
},
)
}
Expand All @@ -116,15 +116,15 @@ pub fn deploy_m2m(
.connect_local_blocking::<ConnectedDemux<ConnectedDirect>>()
.into_sink()
})
.splice_untyped()
.splice_untyped_ctx(&())
},
{
q!({
env.port(c2_port)
.connect_local_blocking::<ConnectedTagged<ConnectedDirect>>()
.into_source()
})
.splice_untyped()
.splice_untyped_ctx(&())
},
)
}
Expand All @@ -139,7 +139,7 @@ pub fn deploy_e2o(
.connect_local_blocking::<ConnectedDirect>()
.into_source()
})
.splice_untyped()
.splice_untyped_ctx(&())
}

pub fn deploy_o2e(
Expand All @@ -152,5 +152,5 @@ pub fn deploy_o2e(
.connect_local_blocking::<ConnectedDirect>()
.into_sink()
})
.splice_untyped()
.splice_untyped_ctx(&())
}
6 changes: 3 additions & 3 deletions hydroflow_plus/src/deploy/macro_runtime.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ use hydroflow::bytes::Bytes;
use hydroflow::futures::{Sink, Stream};
use hydroflow::util::deploy::DeployPorts;
use hydroflow_lang::graph::HydroflowGraph;
use stageleft::{Quoted, RuntimeData};
use stageleft::{QuotedWithContext, RuntimeData};

use super::HydroflowPlusMeta;
use crate::deploy::{ClusterSpec, Deploy, ExternalSpec, Node, ProcessSpec, RegisterPort};
Expand Down Expand Up @@ -170,11 +170,11 @@ impl<'a> Deploy<'a> for DeployRuntime {
fn cluster_ids(
env: &Self::CompileEnv,
of_cluster: usize,
) -> impl Quoted<'a, &'a Vec<u32>> + Copy + 'a {
) -> impl QuotedWithContext<'a, &'a Vec<u32>, ()> + Copy + 'a {
super::deploy_runtime::cluster_members(*env, of_cluster)
}

fn cluster_self_id(env: &Self::CompileEnv) -> impl Quoted<'a, u32> + Copy + 'a {
fn cluster_self_id(env: &Self::CompileEnv) -> impl QuotedWithContext<'a, u32, ()> + Copy + 'a {
super::deploy_runtime::cluster_self_id(*env)
}
}
Expand Down
11 changes: 8 additions & 3 deletions hydroflow_plus/src/deploy/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,14 +7,19 @@ use hydroflow::futures::{Sink, Stream};
use hydroflow_lang::graph::HydroflowGraph;
use serde::de::DeserializeOwned;
use serde::Serialize;
use stageleft::Quoted;
use stageleft::QuotedWithContext;

#[cfg(feature = "deploy_runtime")]
pub mod macro_runtime;

#[cfg(feature = "deploy")]
pub(crate) mod trybuild;

// TODO(shadaj): has to be public due to stageleft limitations
#[cfg(feature = "deploy")]
#[doc(hidden)]
pub mod trybuild_rewriters;

pub use macro_runtime::*;
#[cfg(feature = "deploy")]
pub use trybuild::init_test;
Expand Down Expand Up @@ -171,8 +176,8 @@ pub trait Deploy<'a> {
fn cluster_ids(
env: &Self::CompileEnv,
of_cluster: usize,
) -> impl Quoted<'a, &'a Vec<u32>> + Copy + 'a;
fn cluster_self_id(env: &Self::CompileEnv) -> impl Quoted<'a, u32> + Copy + 'a;
) -> impl QuotedWithContext<'a, &'a Vec<u32>, ()> + Copy + 'a;
fn cluster_self_id(env: &Self::CompileEnv) -> impl QuotedWithContext<'a, u32, ()> + Copy + 'a;
}

impl<
Expand Down
Loading
Loading