Skip to content

Commit

Permalink
feat: generalized hash trie indexes for relational tuples (#1503)
Browse files Browse the repository at this point in the history
Generalized Hash Tries are part of the SIGMOD '23 FreeJoin
[paper](https://dl.acm.org/doi/abs/10.1145/3589295) by
Wang/Willsey/Suciu. They provide a compressed ("factorized")
representation of relations. By operating in the factorized domain, join
algorithms can defer cross-products and achieve asymptotically optimal
performance.

---------

Co-authored-by: Mingwei Samuel <mingwei.samuel@gmail.com>
Co-authored-by: Andre Giron <agiron123@gmail.com>
  • Loading branch information
3 people authored Oct 29, 2024
1 parent b961233 commit f7e740f
Show file tree
Hide file tree
Showing 24 changed files with 2,612 additions and 29 deletions.
16 changes: 16 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ members = [
"stageleft_tool",
"topolotree",
"variadics",
"variadics_macro",
"website_playground",
]

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,4 +12,3 @@ Hello 6
Hello 7
Hello 8
Hello 9

Original file line number Diff line number Diff line change
Expand Up @@ -81,4 +81,3 @@ digraph {
}
}
}

Original file line number Diff line number Diff line change
Expand Up @@ -67,4 +67,3 @@ subgraph sg_4v1 ["sg_4v1 stratum 1"]
9v1
end
end

Original file line number Diff line number Diff line change
Expand Up @@ -53,4 +53,3 @@ subgraph sg_3v1 ["sg_3v1 stratum 1"]
3v1
end
end

Original file line number Diff line number Diff line change
Expand Up @@ -61,4 +61,3 @@ digraph {
}
}
}

Original file line number Diff line number Diff line change
Expand Up @@ -53,4 +53,3 @@ subgraph sg_2v1 ["sg_2v1 stratum 0"]
3v1
end
end

52 changes: 52 additions & 0 deletions hydroflow/tests/surface_lattice_bimorphism.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,9 +2,14 @@ use std::collections::{HashMap, HashSet};

use hydroflow::util::collect_ready;
use hydroflow::{assert_graphvis_snapshots, hydroflow_syntax};
use lattices::ght::lattice::{DeepJoinLatticeBimorphism, GhtBimorphism};
use lattices::ght::GeneralizedHashTrieNode;
use lattices::map_union::{KeyedBimorphism, MapUnionHashMap, MapUnionSingletonMap};
use lattices::set_union::{CartesianProductBimorphism, SetUnionHashSet, SetUnionSingletonSet};
use lattices::GhtType;
use multiplatform_test::multiplatform_test;
use variadics::variadic_collections::VariadicHashSet;
use variadics::CloneVariadic;

#[multiplatform_test]
pub fn test_cartesian_product() {
Expand Down Expand Up @@ -134,3 +139,50 @@ pub fn test_cartesian_product_tick_state() {
&*collect_ready::<Vec<_>, _>(&mut out_recv)
);
}

#[test]
fn test_ght_join_bimorphism() {
type MyGhtATrie = GhtType!(u32, u64, u16 => &'static str: VariadicHashSet);
type MyGhtBTrie = GhtType!(u32, u64, u16 => &'static str: VariadicHashSet);

type JoinSchema = variadics::var_type!(u32, u64, u16, &'static str, &'static str);

type MyNodeBim = <(MyGhtATrie, MyGhtBTrie) as DeepJoinLatticeBimorphism<
VariadicHashSet<JoinSchema>,
>>::DeepJoinLatticeBimorphism;
type MyBim = GhtBimorphism<MyNodeBim>;

let mut hf = hydroflow_syntax! {
lhs = source_iter([
var_expr!(123, 2, 5, "hello"),
var_expr!(50, 1, 1, "hi"),
var_expr!(5, 1, 7, "hi"),
var_expr!(5, 1, 7, "bye"),
])
-> map(|row| MyGhtATrie::new_from([row]))
-> state::<'tick, MyGhtATrie>();
rhs = source_iter([
var_expr!(5, 1, 8, "hi"),
var_expr!(5, 1, 7, "world"),
var_expr!(5, 1, 7, "folks"),
var_expr!(10, 1, 2, "hi"),
var_expr!(12, 10, 98, "bye"),
])
-> map(|row| MyGhtBTrie::new_from([row]))
-> state::<'tick, MyGhtBTrie>();

lhs[items] -> [0]my_join;
rhs[items] -> [1]my_join;


my_join = lattice_bimorphism(MyBim::default(), #lhs, #rhs)
-> lattice_reduce()
-> enumerate()
-> inspect(|x| println!("{:?} {:#?}", context.current_tick(), x))
-> flat_map(|(_num, ght)| ght.recursive_iter().map(<JoinSchema as CloneVariadic>::clone_ref_var).collect::<Vec<_>>())
-> null();
// -> for_each(|x| println!("{:#?}\n", x));
};
// hf.meta_graph().unwrap().open_mermaid(&Default::default());
hf.run_available();
}
73 changes: 73 additions & 0 deletions hydroflow/tests/surface_lattice_generalized_hash_trie.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,73 @@
use hydroflow::hydroflow_syntax;
use hydroflow::lattices::ght::lattice::{DeepJoinLatticeBimorphism, GhtBimorphism};
use hydroflow::lattices::ght::GeneralizedHashTrieNode;
use hydroflow::lattices::GhtType;
use hydroflow::util::collect_ready;
use hydroflow::variadics::{var_expr, var_type};
use variadics::variadic_collections::VariadicHashSet; // Import the Insert trait

#[test]
fn test_basic() {
type MyGht = GhtType!(u16, u32 => u64: VariadicHashSet);
type FlatTup = var_type!(u16, u32, u64);
let input: Vec<FlatTup> = vec![
var_expr!(42, 314, 43770),
var_expr!(42, 315, 43770),
var_expr!(42, 314, 30619),
var_expr!(43, 10, 600),
];
let mut merged = MyGht::default();
for i in input.clone() {
merged.insert(i);
}
println!("merged: {:?}", merged);
let mut df = hydroflow_syntax! {
source_iter(input)
-> map(|t| MyGht::new_from(vec![t]))
-> lattice_fold::<'static>(MyGht::default)
-> inspect(|t| println!("{:?}", t))
-> assert(|x: &MyGht| x.eq(&merged))
-> null();
};
df.run_available();
}

#[test]
fn test_join() {
type MyGht = GhtType!(u8 => u16: VariadicHashSet);
type ResultGht = GhtType!(u8 => u16, u16: VariadicHashSet);
let (out_send, out_recv) = hydroflow::util::unbounded_channel::<_>();

let r = vec![
var_expr!(1, 10),
var_expr!(2, 20),
var_expr!(3, 30),
var_expr!(4, 40),
];
let s = vec![var_expr!(1, 10), var_expr!(5, 50)];

type MyNodeBim = <(MyGht, MyGht) as DeepJoinLatticeBimorphism<
VariadicHashSet<var_type!(u8, u16, u16)>,
>>::DeepJoinLatticeBimorphism;
type MyBim = GhtBimorphism<MyNodeBim>;

let mut df = hydroflow_syntax! {
R = source_iter(r)
-> map(|t| MyGht::new_from([t]))
-> state::<MyGht>();
S = source_iter(s)
-> map(|t| MyGht::new_from([t]))
-> state::<MyGht>();
R[items] -> [0]my_join;
S[items] -> [1]my_join;
my_join = lattice_bimorphism(MyBim::default(), #R, #S)
-> lattice_reduce()
-> for_each(|x| out_send.send(x).unwrap());
};
df.run_available();

assert_eq!(
&[ResultGht::new_from(vec![var_expr!(1, 10, 10),])],
&*collect_ready::<Vec<_>, _>(out_recv)
);
}
5 changes: 2 additions & 3 deletions hydroflow_lang/src/graph/ops/anti_join_multiset.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,9 +2,8 @@ use quote::{quote_spanned, ToTokens};
use syn::parse_quote;

use super::{
DelayType, OpInstGenerics, OperatorCategory, OperatorConstraints,
OperatorInstance, OperatorWriteOutput, Persistence, PortIndexValue, WriteContextArgs, RANGE_0,
RANGE_1,
DelayType, OpInstGenerics, OperatorCategory, OperatorConstraints, OperatorInstance,
OperatorWriteOutput, Persistence, PortIndexValue, WriteContextArgs, RANGE_0, RANGE_1,
};
use crate::diagnostic::{Diagnostic, Level};

Expand Down
3 changes: 3 additions & 0 deletions lattices/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,9 @@ cc-traits = "2.0.0"
sealed = "0.5.0"
serde = { version = "1.0.197", features = ["derive"], optional = true }
lattices_macro = { path = "../lattices_macro", version = "^0.5.6" }
ref-cast = "1.0.23"
variadics = { path = "../variadics", version = "^0.0.6" }
variadics_macro = { path = "../variadics_macro", version = "^0.5.5" }

[dev-dependencies]
trybuild = "1.0.0"
Loading

0 comments on commit f7e740f

Please sign in to comment.