Skip to content

Commit

Permalink
Add a test for rust-lang#2896
Browse files Browse the repository at this point in the history
  • Loading branch information
topecongiro committed Aug 5, 2018
1 parent da17b68 commit c19569c
Show file tree
Hide file tree
Showing 2 changed files with 324 additions and 0 deletions.
161 changes: 161 additions & 0 deletions tests/source/issue-2896.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,161 @@
extern crate rand;
extern crate timely;
extern crate differential_dataflow;

use rand::{Rng, SeedableRng, StdRng};

use timely::dataflow::operators::*;

use differential_dataflow::AsCollection;
use differential_dataflow::operators::*;
use differential_dataflow::input::InputSession;

// mod loglikelihoodratio;

fn main() {

// define a new timely dataflow computation.
timely::execute_from_args(std::env::args().skip(6), move |worker| {

// capture parameters of the experiment.
let users: usize = std::env::args().nth(1).unwrap().parse().unwrap();
let items: usize = std::env::args().nth(2).unwrap().parse().unwrap();
let scale: usize = std::env::args().nth(3).unwrap().parse().unwrap();
let batch: usize = std::env::args().nth(4).unwrap().parse().unwrap();
let noisy: bool = std::env::args().nth(5).unwrap() == "noisy";

let index = worker.index();
let peers = worker.peers();

let (input, probe) = worker.dataflow(|scope| {

// input of (user, item) collection.
let (input, occurrences) = scope.new_input();
let occurrences = occurrences.as_collection();

//TODO adjust code to only work with upper triangular half of cooccurrence matrix

/* Compute the cooccurrence matrix C = A'A from the binary interaction matrix A. */
let cooccurrences =
occurrences
.join_map(&occurrences, |_user, &item_a, &item_b| (item_a, item_b))
.filter(|&(item_a, item_b)| item_a != item_b)
.count();

/* compute the rowsums of C indicating how often we encounter individual items. */
let row_sums =
occurrences
.map(|(_user, item)| item)
.count();

// row_sums.inspect(|record| println!("[row_sums] {:?}", record));

/* Join the cooccurrence pairs with the corresponding row sums. */
let mut cooccurrences_with_row_sums = cooccurrences
.map(|((item_a, item_b), num_cooccurrences)| (item_a, (item_b, num_cooccurrences)))
.join_map(&row_sums, |&item_a, &(item_b, num_cooccurrences), &row_sum_a| {
assert!(row_sum_a > 0);
(item_b, (item_a, num_cooccurrences, row_sum_a))
})
.join_map(&row_sums, |&item_b, &(item_a, num_cooccurrences, row_sum_a), &row_sum_b| {
assert!(row_sum_a > 0);
assert!(row_sum_b > 0);
(item_a, (item_b, num_cooccurrences, row_sum_a, row_sum_b))
});

// cooccurrences_with_row_sums
// .inspect(|record| println!("[cooccurrences_with_row_sums] {:?}", record));

// //TODO compute top-k "similar items" per item
// /* Compute LLR scores for each item pair. */
// let llr_scores = cooccurrences_with_row_sums.map(
// |(item_a, (item_b, num_cooccurrences, row_sum_a, row_sum_b))| {

// println!(
// "[llr_scores] item_a={} item_b={}, num_cooccurrences={} row_sum_a={} row_sum_b={}",
// item_a, item_b, num_cooccurrences, row_sum_a, row_sum_b);

// let k11: isize = num_cooccurrences;
// let k12: isize = row_sum_a as isize - k11;
// let k21: isize = row_sum_b as isize - k11;
// let k22: isize = 10000 - k12 - k21 + k11;

// let llr_score = loglikelihoodratio::log_likelihood_ratio(k11, k12, k21, k22);

// ((item_a, item_b), llr_score)
// });

if noisy {
cooccurrences_with_row_sums =
cooccurrences_with_row_sums
.inspect(|x| println!("change: {:?}", x));
}

let probe =
cooccurrences_with_row_sums
.probe();
/*
// produce the (item, item) collection
let cooccurrences = occurrences
.join_map(&occurrences, |_user, &item_a, &item_b| (item_a, item_b));
// count the occurrences of each item.
let counts = cooccurrences
.map(|(item_a,_)| item_a)
.count();
// produce ((item1, item2), count1, count2, count12) tuples
let cooccurrences_with_counts = cooccurrences
.join_map(&counts, |&item_a, &item_b, &count_item_a| (item_b, (item_a, count_item_a)))
.join_map(&counts, |&item_b, &(item_a, count_item_a), &count_item_b| {
((item_a, item_b), count_item_a, count_item_b)
});
let probe = cooccurrences_with_counts
.inspect(|x| println!("change: {:?}", x))
.probe();
*/
(input, probe)
});

let seed: &[_] = &[1, 2, 3, index];
let mut rng1: StdRng = SeedableRng::from_seed(seed); // rng for edge additions
let mut rng2: StdRng = SeedableRng::from_seed(seed); // rng for edge deletions

let mut input = InputSession::from(input);

for count in 0 .. scale {
if count % peers == index {
let user = rng1.gen_range(0, users);
let item = rng1.gen_range(0, items);
// println!("[INITIAL INPUT] ({}, {})", user, item);
input.insert((user, item));
}
}

// load the initial data up!
while probe.less_than(input.time()) { worker.step(); }

for round in 1 .. {

for element in (round * batch) .. ((round + 1) * batch) {
if element % peers == index {
// advance the input timestamp.
input.advance_to(round * batch);
// insert a new item.
let user = rng1.gen_range(0, users);
let item = rng1.gen_range(0, items);
if noisy { println!("[INPUT: insert] ({}, {})", user, item); }
input.insert((user, item));
// remove an old item.
let user = rng2.gen_range(0, users);
let item = rng2.gen_range(0, items);
if noisy { println!("[INPUT: remove] ({}, {})", user, item); }
input.remove((user, item));
}
}

input.advance_to(round * batch);
input.flush();

while probe.less_than(input.time()) { worker.step(); }
}
}).unwrap();
}
163 changes: 163 additions & 0 deletions tests/target/issue-2896.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,163 @@
extern crate differential_dataflow;
extern crate rand;
extern crate timely;

use rand::{Rng, SeedableRng, StdRng};

use timely::dataflow::operators::*;

use differential_dataflow::input::InputSession;
use differential_dataflow::operators::*;
use differential_dataflow::AsCollection;

// mod loglikelihoodratio;

fn main() {
// define a new timely dataflow computation.
timely::execute_from_args(std::env::args().skip(6), move |worker| {
// capture parameters of the experiment.
let users: usize = std::env::args().nth(1).unwrap().parse().unwrap();
let items: usize = std::env::args().nth(2).unwrap().parse().unwrap();
let scale: usize = std::env::args().nth(3).unwrap().parse().unwrap();
let batch: usize = std::env::args().nth(4).unwrap().parse().unwrap();
let noisy: bool = std::env::args().nth(5).unwrap() == "noisy";

let index = worker.index();
let peers = worker.peers();

let (input, probe) = worker.dataflow(|scope| {
// input of (user, item) collection.
let (input, occurrences) = scope.new_input();
let occurrences = occurrences.as_collection();

//TODO adjust code to only work with upper triangular half of cooccurrence matrix

/* Compute the cooccurrence matrix C = A'A from the binary interaction matrix A. */
let cooccurrences = occurrences
.join_map(&occurrences, |_user, &item_a, &item_b| (item_a, item_b))
.filter(|&(item_a, item_b)| item_a != item_b)
.count();

/* compute the rowsums of C indicating how often we encounter individual items. */
let row_sums = occurrences.map(|(_user, item)| item).count();

// row_sums.inspect(|record| println!("[row_sums] {:?}", record));

/* Join the cooccurrence pairs with the corresponding row sums. */
let mut cooccurrences_with_row_sums = cooccurrences
.map(|((item_a, item_b), num_cooccurrences)| (item_a, (item_b, num_cooccurrences)))
.join_map(
&row_sums,
|&item_a, &(item_b, num_cooccurrences), &row_sum_a| {
assert!(row_sum_a > 0);
(item_b, (item_a, num_cooccurrences, row_sum_a))
},
).join_map(
&row_sums,
|&item_b, &(item_a, num_cooccurrences, row_sum_a), &row_sum_b| {
assert!(row_sum_a > 0);
assert!(row_sum_b > 0);
(item_a, (item_b, num_cooccurrences, row_sum_a, row_sum_b))
},
);

// cooccurrences_with_row_sums
// .inspect(|record| println!("[cooccurrences_with_row_sums] {:?}", record));

// //TODO compute top-k "similar items" per item
// /* Compute LLR scores for each item pair. */
// let llr_scores = cooccurrences_with_row_sums.map(
// |(item_a, (item_b, num_cooccurrences, row_sum_a, row_sum_b))| {

// println!(
// "[llr_scores] item_a={} item_b={}, num_cooccurrences={} row_sum_a={} row_sum_b={}",
// item_a, item_b, num_cooccurrences, row_sum_a, row_sum_b);

// let k11: isize = num_cooccurrences;
// let k12: isize = row_sum_a as isize - k11;
// let k21: isize = row_sum_b as isize - k11;
// let k22: isize = 10000 - k12 - k21 + k11;

// let llr_score = loglikelihoodratio::log_likelihood_ratio(k11, k12, k21, k22);

// ((item_a, item_b), llr_score)
// });

if noisy {
cooccurrences_with_row_sums =
cooccurrences_with_row_sums.inspect(|x| println!("change: {:?}", x));
}

let probe = cooccurrences_with_row_sums.probe();
/*
// produce the (item, item) collection
let cooccurrences = occurrences
.join_map(&occurrences, |_user, &item_a, &item_b| (item_a, item_b));
// count the occurrences of each item.
let counts = cooccurrences
.map(|(item_a,_)| item_a)
.count();
// produce ((item1, item2), count1, count2, count12) tuples
let cooccurrences_with_counts = cooccurrences
.join_map(&counts, |&item_a, &item_b, &count_item_a| (item_b, (item_a, count_item_a)))
.join_map(&counts, |&item_b, &(item_a, count_item_a), &count_item_b| {
((item_a, item_b), count_item_a, count_item_b)
});
let probe = cooccurrences_with_counts
.inspect(|x| println!("change: {:?}", x))
.probe();
*/
(input, probe)
});

let seed: &[_] = &[1, 2, 3, index];
let mut rng1: StdRng = SeedableRng::from_seed(seed); // rng for edge additions
let mut rng2: StdRng = SeedableRng::from_seed(seed); // rng for edge deletions

let mut input = InputSession::from(input);

for count in 0..scale {
if count % peers == index {
let user = rng1.gen_range(0, users);
let item = rng1.gen_range(0, items);
// println!("[INITIAL INPUT] ({}, {})", user, item);
input.insert((user, item));
}
}

// load the initial data up!
while probe.less_than(input.time()) {
worker.step();
}

for round in 1.. {
for element in (round * batch)..((round + 1) * batch) {
if element % peers == index {
// advance the input timestamp.
input.advance_to(round * batch);
// insert a new item.
let user = rng1.gen_range(0, users);
let item = rng1.gen_range(0, items);
if noisy {
println!("[INPUT: insert] ({}, {})", user, item);
}
input.insert((user, item));
// remove an old item.
let user = rng2.gen_range(0, users);
let item = rng2.gen_range(0, items);
if noisy {
println!("[INPUT: remove] ({}, {})", user, item);
}
input.remove((user, item));
}
}

input.advance_to(round * batch);
input.flush();

while probe.less_than(input.time()) {
worker.step();
}
}
}).unwrap();
}

0 comments on commit c19569c

Please sign in to comment.