From 2527095792ebfa5db1e8795663018e7558fbb2ff Mon Sep 17 00:00:00 2001 From: Frank McSherry Date: Thu, 26 Dec 2024 19:32:20 -0500 Subject: [PATCH] Demonstrate container input batching --- examples/columnar.rs | 32 +++++++++++++++++++++++++------- 1 file changed, 25 insertions(+), 7 deletions(-) diff --git a/examples/columnar.rs b/examples/columnar.rs index 81c39eec8..637099668 100644 --- a/examples/columnar.rs +++ b/examples/columnar.rs @@ -1,7 +1,7 @@ //! Wordcount based on `columnar`. use { - timely::container::CapacityContainerBuilder, + timely::container::{Container, CapacityContainerBuilder}, timely::dataflow::channels::pact::ExchangeCore, timely::dataflow::InputHandleCore, timely::dataflow::ProbeHandle, @@ -43,8 +43,8 @@ fn main() { let data = data_input.to_stream(scope); let keys = keys_input.to_stream(scope); - let data_pact = ExchangeCore::,_>::new_core(|x: &((&str,()),&u64,&i64)| (x.0).0.as_bytes().iter().sum::() as u64); - let keys_pact = ExchangeCore::,_>::new_core(|x: &((&str,()),&u64,&i64)| (x.0).0.as_bytes().iter().sum::() as u64); + let data_pact = ExchangeCore::,_>::new_core(|x: &((&str,()),&u64,&i64)| (x.0).0.as_bytes().iter().map(|x| *x as u64).sum::() as u64); + let keys_pact = ExchangeCore::,_>::new_core(|x: &((&str,()),&u64,&i64)| (x.0).0.as_bytes().iter().map(|x| *x as u64).sum::() as u64); let data = arrange_core::<_,_,Col2KeyBatcher<_,_,_>, ColKeyBuilder<_,_,_>, ColKeySpine<_,_,_>>(&data, data_pact, "Data"); let keys = arrange_core::<_,_,Col2KeyBatcher<_,_,_>, ColKeyBuilder<_,_,_>, ColKeySpine<_,_,_>>(&keys, keys_pact, "Keys"); @@ -54,6 +54,11 @@ fn main() { }); + // Resources for placing input data in containers. + use std::fmt::Write; + let mut buffer = String::default(); + let mut container = Container::default(); + // Load up data in batches. let mut counter = 0; while counter < 10 * keys { @@ -61,9 +66,13 @@ fn main() { let time = *data_input.time(); while i < size { let val = (counter + i) % keys; - data_input.send(((&format!("{:?}", val), ()), time, 1)); + write!(buffer, "{:?}", val).unwrap(); + container.push(((&buffer, ()), time, 1)); + buffer.clear(); i += worker.peers(); } + data_input.send_batch(&mut container); + container.clear(); counter += size; data_input.advance_to(data_input.time() + 1); keys_input.advance_to(keys_input.time() + 1); @@ -80,9 +89,13 @@ fn main() { let time = *data_input.time(); while i < size { let val = (queries + i) % keys; - data_input.send(((&format!("{:?}", val), ()), time, 1)); + write!(buffer, "{:?}", val).unwrap(); + container.push(((&buffer, ()), time, 1)); + buffer.clear(); i += worker.peers(); } + data_input.send_batch(&mut container); + container.clear(); queries += size; data_input.advance_to(data_input.time() + 1); keys_input.advance_to(keys_input.time() + 1); @@ -293,7 +306,7 @@ mod builder { let words = self.current.borrow().length_in_words(); let round = (words + ((1 << 18) - 1)) & !((1 << 18) - 1); if round - words < round / 10 { - let mut alloc = Vec::with_capacity(round); + let mut alloc = Vec::with_capacity(8 * words); columnar::bytes::serialization::encode(&mut alloc, self.current.borrow().as_bytes()); self.pending.push_back(Column::Align(alloc.into_boxed_slice())); self.current.clear(); @@ -328,7 +341,12 @@ mod builder { #[inline] fn finish(&mut self) -> Option<&mut Self::Container> { if !self.current.is_empty() { - self.pending.push_back(Column::Typed(std::mem::take(&mut self.current))); + use columnar::Container; + let words = self.current.borrow().length_in_words(); + let mut alloc = Vec::with_capacity(8 * words); + columnar::bytes::serialization::encode(&mut alloc, self.current.borrow().as_bytes()); + self.pending.push_back(Column::Align(alloc.into_boxed_slice())); + self.current.clear(); } self.empty = self.pending.pop_front(); self.empty.as_mut()