Skip to content

Commit

Permalink
Demonstrate container input batching (#556)
Browse files Browse the repository at this point in the history
  • Loading branch information
frankmcsherry authored Dec 27, 2024
1 parent 5976a2f commit da18a35
Showing 1 changed file with 25 additions and 7 deletions.
32 changes: 25 additions & 7 deletions examples/columnar.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
//! Wordcount based on `columnar`.
use {
timely::container::CapacityContainerBuilder,
timely::container::{Container, CapacityContainerBuilder},
timely::dataflow::channels::pact::ExchangeCore,
timely::dataflow::InputHandleCore,
timely::dataflow::ProbeHandle,
Expand Down Expand Up @@ -43,8 +43,8 @@ fn main() {
let data = data_input.to_stream(scope);
let keys = keys_input.to_stream(scope);

let data_pact = ExchangeCore::<ColumnBuilder<((String,()),u64,i64)>,_>::new_core(|x: &((&str,()),&u64,&i64)| (x.0).0.as_bytes().iter().sum::<u8>() as u64);
let keys_pact = ExchangeCore::<ColumnBuilder<((String,()),u64,i64)>,_>::new_core(|x: &((&str,()),&u64,&i64)| (x.0).0.as_bytes().iter().sum::<u8>() as u64);
let data_pact = ExchangeCore::<ColumnBuilder<((String,()),u64,i64)>,_>::new_core(|x: &((&str,()),&u64,&i64)| (x.0).0.as_bytes().iter().map(|x| *x as u64).sum::<u64>() as u64);
let keys_pact = ExchangeCore::<ColumnBuilder<((String,()),u64,i64)>,_>::new_core(|x: &((&str,()),&u64,&i64)| (x.0).0.as_bytes().iter().map(|x| *x as u64).sum::<u64>() as u64);

let data = arrange_core::<_,_,Col2KeyBatcher<_,_,_>, ColKeyBuilder<_,_,_>, ColKeySpine<_,_,_>>(&data, data_pact, "Data");
let keys = arrange_core::<_,_,Col2KeyBatcher<_,_,_>, ColKeyBuilder<_,_,_>, ColKeySpine<_,_,_>>(&keys, keys_pact, "Keys");
Expand All @@ -54,16 +54,25 @@ fn main() {

});

// Resources for placing input data in containers.
use std::fmt::Write;
let mut buffer = String::default();
let mut container = Container::default();

// Load up data in batches.
let mut counter = 0;
while counter < 10 * keys {
let mut i = worker.index();
let time = *data_input.time();
while i < size {
let val = (counter + i) % keys;
data_input.send(((&format!("{:?}", val), ()), time, 1));
write!(buffer, "{:?}", val).unwrap();
container.push(((&buffer, ()), time, 1));
buffer.clear();
i += worker.peers();
}
data_input.send_batch(&mut container);
container.clear();
counter += size;
data_input.advance_to(data_input.time() + 1);
keys_input.advance_to(keys_input.time() + 1);
Expand All @@ -80,9 +89,13 @@ fn main() {
let time = *data_input.time();
while i < size {
let val = (queries + i) % keys;
data_input.send(((&format!("{:?}", val), ()), time, 1));
write!(buffer, "{:?}", val).unwrap();
container.push(((&buffer, ()), time, 1));
buffer.clear();
i += worker.peers();
}
data_input.send_batch(&mut container);
container.clear();
queries += size;
data_input.advance_to(data_input.time() + 1);
keys_input.advance_to(keys_input.time() + 1);
Expand Down Expand Up @@ -293,7 +306,7 @@ mod builder {
let words = self.current.borrow().length_in_words();
let round = (words + ((1 << 18) - 1)) & !((1 << 18) - 1);
if round - words < round / 10 {
let mut alloc = Vec::with_capacity(round);
let mut alloc = Vec::with_capacity(8 * words);
columnar::bytes::serialization::encode(&mut alloc, self.current.borrow().as_bytes());
self.pending.push_back(Column::Align(alloc.into_boxed_slice()));
self.current.clear();
Expand Down Expand Up @@ -328,7 +341,12 @@ mod builder {
#[inline]
fn finish(&mut self) -> Option<&mut Self::Container> {
if !self.current.is_empty() {
self.pending.push_back(Column::Typed(std::mem::take(&mut self.current)));
use columnar::Container;
let words = self.current.borrow().length_in_words();
let mut alloc = Vec::with_capacity(8 * words);
columnar::bytes::serialization::encode(&mut alloc, self.current.borrow().as_bytes());
self.pending.push_back(Column::Align(alloc.into_boxed_slice()));
self.current.clear();
}
self.empty = self.pending.pop_front();
self.empty.as_mut()
Expand Down

0 comments on commit da18a35

Please sign in to comment.