Skip to content
This repository was archived by the owner on Jul 3, 2023. It is now read-only.
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 7 additions & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,13 @@ hashbrown = "0.12.0"
csv = { version = "1.1", optional = true }
serde = { version = "1.0", optional = true }
impl-trait-for-tuples = "0.2"
deepsize = "0.2.0"
deepsize_derive = "0.1.2"
textwrap = "0.15.0"

# TODO: eliminate dependency on timely-dataflow by cloning relevant
# parts.
timely = "0.12.0"

[[bench]]
name = "galen"
Expand Down
171 changes: 88 additions & 83 deletions benches/galen.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,11 +3,15 @@

use csv::{Reader as CsvReader, ReaderBuilder};
use dbsp::{
algebra::{FiniteMap, HasZero, ZSetHashMap},
algebra::HasZero,
circuit::{trace::SchedulerEvent, GlobalNodeId, Root, Runtime, Stream},
monitor::TraceMonitor,
operator::{CsvSource, DelayedFeedback},
profile::CPUProfiler,
trace::{
ord::{OrdIndexedZSet, OrdZSet, OrdZSetSpine},
BatchReader,
},
};
use std::{collections::HashMap, fmt::Write, fs, fs::File, path::PathBuf};

Expand Down Expand Up @@ -37,9 +41,9 @@ q(?x,?e,?o) :- q(?x,?y,?z),r(?y,?u,?e),q(?z,?u,?o).
type Number = u32;
type Weight = isize;

fn csv_source<T>(file: &str) -> CsvSource<File, ZSetHashMap<T, Weight>, T>
fn csv_source<T>(file: &str) -> CsvSource<File, T, Weight, OrdZSet<T, Weight>>
where
T: Clone,
T: Clone + Ord,
{
let path: PathBuf = ["benches", "galen_data", file].iter().collect();

Expand All @@ -56,6 +60,7 @@ fn main() {
let monitor = TraceMonitor::new_panic_on_error();

let root = Root::build(|circuit| {
/*
let cpu_profiler = CPUProfiler::new();
cpu_profiler.attach(circuit, "cpu profiler");

Expand All @@ -72,13 +77,14 @@ fn main() {
.or_insert_with(|| String::new());
metadata_string.clear();
node.summary(metadata_string);
//}
//SchedulerEvent::StepEnd => {
}
SchedulerEvent::StepEnd => {
let graph = monitor_clone.visualize_circuit_annotate(&|node_id| {
let mut metadata_string = metadata
.get(node_id)
.map(ToString::to_string)
.unwrap_or_else(|| "".to_string());
writeln!(metadata_string, "id: {}", node_id).unwrap();
if let Some(cpu_profile) = cpu_profiler.operator_profile(node_id) {
writeln!(
metadata_string,
Expand All @@ -97,6 +103,7 @@ fn main() {
_ => {}
}
});
*/

let p_source = csv_source::<(Number, Number)>("p.txt");
let q_source = csv_source::<(Number, Number, Number)>("q.txt");
Expand All @@ -105,133 +112,131 @@ fn main() {
let u_source = csv_source::<(Number, Number, Number)>("u.txt");
let s_source = csv_source::<(Number, Number)>("s.txt");

let p: Stream<_, ZSetHashMap<_, Weight>> =
let p: Stream<_, OrdZSet<_, Weight>> =
circuit.region("p", || circuit.add_source(p_source));
let q: Stream<_, ZSetHashMap<_, Weight>> =
let q: Stream<_, OrdZSet<_, Weight>> =
circuit.region("q", || circuit.add_source(q_source));
let r: Stream<_, ZSetHashMap<_, Weight>> =
let r: Stream<_, OrdZSet<_, Weight>> =
circuit.region("r", || circuit.add_source(r_source));
let c: Stream<_, ZSetHashMap<_, Weight>> =
let c: Stream<_, OrdZSet<_, Weight>> =
circuit.region("c", || circuit.add_source(c_source));
let u: Stream<_, ZSetHashMap<_, Weight>> =
let u: Stream<_, OrdZSet<_, Weight>> =
circuit.region("u", || circuit.add_source(u_source));
let s: Stream<_, ZSetHashMap<_, Weight>> =
let s: Stream<_, OrdZSet<_, Weight>> =
circuit.region("s", || circuit.add_source(s_source));

let (outp, outq) = circuit
.iterate_with_conditions(|child| {
let pvar: DelayedFeedback<_, ZSetHashMap<(Number, Number), Weight>> =
.fixedpoint(|child| {
let pvar: DelayedFeedback<_, OrdZSet<(Number, Number), Weight>> =
DelayedFeedback::new(child);
let qvar: DelayedFeedback<_, ZSetHashMap<(Number, Number, Number), Weight>> =
let qvar: DelayedFeedback<_, OrdZSet<(Number, Number, Number), Weight>> =
DelayedFeedback::new(child);

let p_by_1: Stream<_, ZSetHashMap<_, _>> = pvar.stream().index();
let p_by_2: Stream<_, ZSetHashMap<_, _>> =
pvar.stream().index_with(|&(x, y)| (y, x));
let p_by_12: Stream<_, ZSetHashMap<_, _>> =
pvar.stream().index_with(|&(x, y)| ((x, y), ()));
let u_by_1: Stream<_, ZSetHashMap<_, _>> =
u.delta0(child).index_with(|&(x, y, z)| (x, (y, z)));
let q_by_1: Stream<_, ZSetHashMap<_, _>> =
qvar.stream().index_with(|&(x, y, z)| (x, (y, z)));
let q_by_2: Stream<_, ZSetHashMap<_, _>> =
qvar.stream().index_with(|&(x, y, z)| (y, (x, z)));
let q_by_12: Stream<_, ZSetHashMap<_, _>> =
qvar.stream().index_with(|&(x, y, z)| ((x, y), z));
let q_by_23: Stream<_, ZSetHashMap<_, _>> =
qvar.stream().index_with(|&(x, y, z)| ((y, z), x));
let c_by_2: Stream<_, ZSetHashMap<_, _>> =
c.delta0(child).index_with(|&(x, y, z)| (y, (x, z)));
let r_by_1: Stream<_, ZSetHashMap<_, _>> =
r.delta0(child).index_with(|&(x, y, z)| (x, (y, z)));
let s_by_1: Stream<_, ZSetHashMap<_, _>> = s.delta0(child).index();
let p_by_1 = pvar.stream().index::<OrdIndexedZSet<_, _, _>>();
let p_by_2 = pvar
.stream()
.index_with::<OrdIndexedZSet<_, _, _>, _>(|&(x, y)| (y, x));
let p_by_12 = pvar
.stream()
.index_with::<OrdIndexedZSet<_, _, _>, _>(|&(x, y)| ((x, y), ()));
let u_by_1 = u
.delta0(child)
.index_with::<OrdIndexedZSet<_, _, _>, _>(|&(x, y, z)| (x, (y, z)));
let q_by_1 = qvar
.stream()
.index_with::<OrdIndexedZSet<_, _, _>, _>(|&(x, y, z)| (x, (y, z)));
let q_by_2 = qvar
.stream()
.index_with::<OrdIndexedZSet<_, _, _>, _>(|&(x, y, z)| (y, (x, z)));
let q_by_12 = qvar
.stream()
.index_with::<OrdIndexedZSet<_, _, _>, _>(|&(x, y, z)| ((x, y), z));
let q_by_23 = qvar
.stream()
.index_with::<OrdIndexedZSet<_, _, _>, _>(|&(x, y, z)| ((y, z), x));
let c_by_2 = c
.delta0(child)
.index_with::<OrdIndexedZSet<_, _, _>, _>(|&(x, y, z)| (y, (x, z)));
let r_by_1 = r
.delta0(child)
.index_with::<OrdIndexedZSet<_, _, _>, _>(|&(x, y, z)| (x, (y, z)));
let s_by_1 = s.delta0(child).index::<OrdIndexedZSet<_, _, _>>();

// IR1: p(x,z) :- p(x,y), p(y,z).
let ir1 = child.region("IR1", || {
p_by_2.join_incremental_nested(&p_by_1, |&_y, &x, &z| (x, z))
});
ir1.inspect(|zs: &ZSetHashMap<_, _>| println!("ir1: {}", zs.support_size()));
let ir1 =
child.region("IR1", || p_by_2.join_trace(&p_by_1, |&_y, &x, &z| (x, z)));
ir1.inspect(|zs: &OrdZSet<_, _>| println!("ir1: {}", zs.len()));

// IR2: q(x,r,z) := p(x,y), q(y,r,z)
let ir2 = child.region("IR2", || {
p_by_2.join_incremental_nested(&q_by_1, |&_y, &x, &(r, z)| (x, r, z))
p_by_2.join_trace(&q_by_1, |&_y, &x, &(r, z)| (x, r, z))
});

ir2.inspect(|zs: &ZSetHashMap<_, _>| println!("ir2: {}", zs.support_size()));
ir2.inspect(|zs: &OrdZSet<_, _>| println!("ir2: {}", zs.len()));

// IR3: p(x,z) := p(y,w), u(w,r,z), q(x,r,y)
let ir3 = child.region("IR3", || {
p_by_2
.join_incremental_nested::<_, _, _, _, _, _, _, ZSetHashMap<_, _>>(
&u_by_1,
|&_w, &y, &(r, z)| ((r, y), z),
)
.index::<_, _, _, ZSetHashMap<_, _>>()
.join_incremental_nested(&q_by_23, |&(_r, _y), &z, &x| (x, z))
.join_trace::<_, _, OrdZSet<_, _>>(&u_by_1, |&_w, &y, &(r, z)| {
((r, y), z)
})
.index::<OrdIndexedZSet<_, _, _>>()
.join_trace(&q_by_23, |&(_r, _y), &z, &x| (x, z))
});
ir3.inspect(|zs: &ZSetHashMap<_, _>| println!("ir3: {}", zs.support_size()));
ir3.inspect(|zs: &OrdZSet<_, _>| println!("ir3: {}", zs.len()));

// IR4: p(x,z) := c(y,w,z), p(x,w), p(x,y)
let ir4_1 = child.region("IR4-1", || {
c_by_2.join_incremental_nested::<_, _, _, _, _, _, _, ZSetHashMap<_, _>>(
&p_by_2,
|&_w, &(y, z), &x| ((x, y), z),
)
c_by_2.join_trace::<_, _, OrdZSet<_, _>>(&p_by_2, |&_w, &(y, z), &x| {
((x, y), z)
})
});
ir4_1
.inspect(|zs: &ZSetHashMap<_, _>| println!("ir4_1: {}", zs.support_size()));
ir4_1.inspect(|zs: &OrdZSet<_, _>| println!("ir4_1: {}", zs.len()));

let ir4 = child.region("IR4-2", || {
ir4_1
.index::<_, _, _, ZSetHashMap<_, _>>()
.join_incremental_nested(&p_by_12, |&(x, _y), &z, &()| (x, z))
.index::<OrdIndexedZSet<_, _, _>>()
.join_trace(&p_by_12, |&(x, _y), &z, &()| (x, z))
});
ir4.inspect(|zs: &ZSetHashMap<_, _>| println!("ir4: {}", zs.support_size()));
ir4.inspect(|zs: &OrdZSet<_, _>| println!("ir4: {}", zs.len()));

// IR5: q(x,q,z) := q(x,r,z), s(r,q)
let ir5 = child.region("IR5", || {
q_by_2.join_incremental_nested(&s_by_1, |&_r, &(x, z), &q| (x, q, z))
q_by_2.join_trace(&s_by_1, |&_r, &(x, z), &q| (x, q, z))
});
ir5.inspect(|zs: &ZSetHashMap<_, _>| println!("ir5: {}", zs.support_size()));
ir5.inspect(|zs: &OrdZSet<_, _>| println!("ir5: {}", zs.len()));

// IR6: q(x,e,o) := q(x,y,z), r(y,u,e), q(z,u,o)
let ir6 = child.region("IR6", || {
let ir6_1 = child.region("IR6_1", || {
q_by_2
.join_incremental_nested::<_, _, _, _, _, _, _, ZSetHashMap<_, _>>(
&r_by_1,
|&_y, &(x, z), &(u, e)| ((z, u), (x, e)),
)
.index::<_, _, _, ZSetHashMap<_, _>>()
.join_incremental_nested(&q_by_12, |&(_z, _u), &(x, e), &o| (x, e, o))
.join_trace::<_, _, OrdZSet<_, _>>(&r_by_1, |&_y, &(x, z), &(u, e)| {
((z, u), (x, e))
})
.index::<OrdIndexedZSet<_, _, _>>()
});
let ir6 = child.region("IR6", || {
ir6_1.join_trace(&q_by_12, |&(_z, _u), &(x, e), &o| (x, e, o))
});
ir6.inspect(|zs: &ZSetHashMap<_, _>| println!("ir6: {}", zs.support_size()));

let p = p
.delta0(child)
.sum([&ir1, &ir3, &ir4])
.distinct_incremental_nested();
ir6.inspect(|zs: &OrdZSet<_, _>| println!("ir6: {}", zs.len()));

let q = q
.delta0(child)
.sum([&ir2, &ir5, &ir6])
.distinct_incremental_nested();
let p = p.delta0(child).sum([&ir1, &ir3, &ir4]).distinct_trace();

let q = q.delta0(child).sum([&ir2, &ir5, &ir6]).distinct_trace();

pvar.connect(&p);
qvar.connect(&q);

Ok((
vec![
p.condition(HasZero::is_zero),
p.integrate_nested().condition(HasZero::is_zero),
q.condition(HasZero::is_zero),
q.integrate_nested().condition(HasZero::is_zero),
],
(p.integrate().export(), q.integrate().export()),
p.integrate_trace::<OrdZSetSpine<_, _>>().export(),
q.integrate_trace::<OrdZSetSpine<_, _>>().export(),
))
})
.unwrap();
outp.inspect(|zs: &ZSetHashMap<_, _>| println!("outp: {}", zs.support_size()));
outq.inspect(|zs: &ZSetHashMap<_, _>| println!("outq: {}", zs.support_size()));
outp.consolidate::<OrdZSet<_, _>>()
.inspect(|zs: &OrdZSet<_, _>| println!("outp: {}", zs.len()));
outq.consolidate::<OrdZSet<_, _>>()
.inspect(|zs: &OrdZSet<_, _>| println!("outq: {}", zs.len()));
})
.unwrap();

Expand Down
Loading