Skip to content

Commit

Permalink
feat(hydroflow)!: remove import!, fix hydro-project#1110
Browse files Browse the repository at this point in the history
  • Loading branch information
MingweiSamuel committed Dec 7, 2024
1 parent 032cde6 commit fcbc501
Show file tree
Hide file tree
Showing 11 changed files with 11 additions and 314 deletions.
4 changes: 1 addition & 3 deletions benches/benches/fork_join.rs
Original file line number Diff line number Diff line change
Expand Up @@ -86,9 +86,7 @@ fn benchmark_hydroflow(c: &mut Criterion) {
fn benchmark_hydroflow_surface(c: &mut Criterion) {
c.bench_function("fork_join/hydroflow/surface", |b| {
b.iter(|| {
let mut hf = hydroflow_syntax! {
source_iter(0..NUM_INTS) -> import!("fork_join_20.hf") -> for_each(|x| { black_box(x); });
};
let mut hf = include!("fork_join_20.hf");
hf.run_available();
})
});
Expand Down
8 changes: 4 additions & 4 deletions benches/build.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,17 +19,17 @@ pub fn fork_join() -> std::io::Result<()> {
let file = File::create(path)?;
let mut write = BufWriter::new(file);

writeln!(write, "a0 = mod -> tee();")?;

writeln!(write, "hydroflow_syntax! {{")?;
writeln!(write, "a0 = source_iter(0..NUM_INTS) -> tee();")?;
for i in 0..NUM_OPS {
if i > 0 {
writeln!(write, "a{} = union() -> tee();", i)?;
}
writeln!(write, "a{} -> filter(|x| x % 2 == 0) -> a{};", i, i + 1)?;
writeln!(write, "a{} -> filter(|x| x % 2 == 1) -> a{};", i, i + 1)?;
}

writeln!(write, "a{} = union() -> mod;", NUM_OPS)?;
writeln!(write, "a{} = union() -> for_each(|x| {{ black_box(x); }});", NUM_OPS)?;
writeln!(write, "}}")?;

write.flush()?;

Expand Down
8 changes: 0 additions & 8 deletions hydroflow/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -29,14 +29,6 @@ required-features = [ "nightly" ]
name = "python_udf"
required-features = [ "python" ]

[[example]]
name = "modules_outer_join"
required-features = [ "debugging" ]

[[example]]
name = "modules_triple_cross_join"
required-features = [ "debugging" ]

[dependencies]
bincode = "1.3.1"
byteorder = "1.3.2"
Expand Down
23 changes: 0 additions & 23 deletions hydroflow/examples/modules_outer_join/full_outer_join.hf

This file was deleted.

16 changes: 0 additions & 16 deletions hydroflow/examples/modules_outer_join/left_outer_join.hf

This file was deleted.

30 changes: 0 additions & 30 deletions hydroflow/examples/modules_outer_join/main.rs

This file was deleted.

6 changes: 0 additions & 6 deletions hydroflow/examples/modules_outer_join/right_outer_join.hf

This file was deleted.

51 changes: 0 additions & 51 deletions hydroflow/examples/modules_triple_cross_join/main.rs

This file was deleted.

15 changes: 0 additions & 15 deletions hydroflow/examples/modules_triple_cross_join/triple_cross_join.hf

This file was deleted.

111 changes: 0 additions & 111 deletions hydroflow_lang/src/graph/flat_graph_builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -276,120 +276,9 @@ impl FlatGraphBuilder {
out: Some((PortIndexValue::Elided(op_span), GraphDet::Determined(nid))),
}
}
Pipeline::Import(import) => {
// TODO: https://github.com/rust-lang/rfcs/pull/3200
// this would be way better...
let file_path = {
let mut dir = self.invocating_file_path.clone();
dir.pop();
dir.join(import.filename.value())
};

let file_contents = match std::fs::read_to_string(&file_path) {
Ok(contents) => contents,
Err(err) => {
self.diagnostics.push(Diagnostic::spanned(
import.filename.span(),
Level::Error,
format!("filename: {}, err: {err}", import.filename.value()),
));

return Ends {
inn: None,
out: None,
};
}
};

let statements = match syn::parse_str::<HfCode>(&file_contents) {
Ok(code) => code,
Err(err) => {
self.diagnostics.push(Diagnostic::spanned(
import.span(),
Level::Error,
format!("Error in module: {}", err),
));

return Ends {
inn: None,
out: None,
};
}
};

let flat_graph_builder = FlatGraphBuilder::from_hfmodule(statements, file_path);
let (flat_graph, _uses, diagnostics) = flat_graph_builder.build();
diagnostics.iter().for_each(Diagnostic::emit);

self.merge_in(flat_graph, import.span())
}
}
}

/// Merge one flatgraph into the current flatgraph
/// other must be a flatgraph and not be partitioned yet.
fn merge_in(&mut self, other: HydroflowGraph, parent_span: Span) -> Ends {
assert_eq!(other.subgraphs().count(), 0);

let mut ends = Ends {
inn: None,
out: None,
};

let mut node_mapping = BTreeMap::new();

for (other_node_id, node) in other.nodes() {
match node {
GraphNode::Operator(_) => {
let varname = other.node_varname(other_node_id);
let new_id = self.flat_graph.insert_node(node.clone(), varname, None);
node_mapping.insert(other_node_id, new_id);
}
GraphNode::ModuleBoundary { input, .. } => {
let new_id = self.flat_graph.insert_node(
GraphNode::ModuleBoundary {
input: *input,
import_expr: parent_span,
},
Some(Ident::new(&format!("module_{}", input), parent_span)),
None,
);
node_mapping.insert(other_node_id, new_id);

// in the case of nested imports, this module boundary might not be the module boundary into or out of the top-most module
// So we have to be careful to only target those two boundaries.
// There should be no inputs to it, if it is an input boundary, if it is the top-most one.
// and there should be no outputs from it, if it is an output boundary, if it is the top-most one.
if *input && other.node_predecessor_nodes(other_node_id).count() == 0 {
if other.node_predecessor_nodes(other_node_id).count() == 0 {
ends.inn =
Some((PortIndexValue::Elided(None), GraphDet::Determined(new_id)));
}
} else if !(*input) && other.node_successor_nodes(other_node_id).count() == 0 {
ends.out =
Some((PortIndexValue::Elided(None), GraphDet::Determined(new_id)));
}
}
GraphNode::Handoff { .. } => {
panic!("Handoff in graph that is being merged into self")
}
}
}

for (other_edge_id, (other_src, other_dst)) in other.edges() {
let (src_port, dst_port) = other.edge_ports(other_edge_id);

let _new_edge_id = self.flat_graph.insert_edge(
*node_mapping.get(&other_src).unwrap(),
src_port.clone(),
*node_mapping.get(&other_dst).unwrap(),
dst_port.clone(),
);
}

ends
}

/// Connects operator links as a final building step. Processes all the links stored in
/// `self.links` and actually puts them into the graph.
fn connect_operator_links(&mut self) {
Expand Down
Loading

0 comments on commit fcbc501

Please sign in to comment.