Skip to content

Commit

Permalink
re-enable simpler broadcast
Browse files Browse the repository at this point in the history
  • Loading branch information
frankmcsherry committed Feb 11, 2019
1 parent 55d9940 commit ca40906
Show file tree
Hide file tree
Showing 2 changed files with 95 additions and 84 deletions.
175 changes: 93 additions & 82 deletions src/dataflow/operators/broadcast.rs
Original file line number Diff line number Diff line change
@@ -1,20 +1,22 @@
//! Broadcast records to all workers.

use std::rc::Rc;
use std::cell::RefCell;
// use std::rc::Rc;
// use std::cell::RefCell;

use communication::Pull;
// use communication::Pull;

use ::ExchangeData;
use progress::{Source, Target};
use progress::{Timestamp, Operate, operate::{Schedule, SharedProgress}, Antichain};
// use progress::{Source, Target};
// use progress::{Timestamp, Operate, operate::{Schedule, SharedProgress}, Antichain};
use dataflow::{Stream, Scope};
use dataflow::channels::{Message, Bundle};
use dataflow::channels::pushers::Counter as PushCounter;
use dataflow::channels::pushers::buffer::Buffer as PushBuffer;
use dataflow::channels::pushers::Tee;
use dataflow::channels::pullers::Counter as PullCounter;
use dataflow::channels::pact::{LogPusher, LogPuller};
use dataflow::operators::{Map, Exchange};

// use dataflow::channels::{Message, Bundle};
// use dataflow::channels::pushers::Counter as PushCounter;
// use dataflow::channels::pushers::buffer::Buffer as PushBuffer;
// use dataflow::channels::pushers::Tee;
// use dataflow::channels::pullers::Counter as PullCounter;
// use dataflow::channels::pact::{LogPusher, LogPuller};

/// Broadcast records to all workers.
pub trait Broadcast<D: ExchangeData> {
Expand All @@ -35,89 +37,98 @@ pub trait Broadcast<D: ExchangeData> {

impl<G: Scope, D: ExchangeData> Broadcast<D> for Stream<G, D> {
fn broadcast(&self) -> Stream<G, D> {
let mut scope = self.scope();

let channel_id = scope.new_identifier();
// NOTE: Simplified implementation due to underlying motion
// in timely dataflow internals. Optimize once they have
// settled down.
let peers = self.scope().peers() as u64;
self.flat_map(move |x| (0 .. peers).map(move |i| (i,x.clone())))
.exchange(|ix| ix.0)
.map(|(_i,x)| x)

let (pushers, puller) = scope.allocate::<Message<G::Timestamp, D>>(channel_id);
let (targets, registrar) = Tee::<G::Timestamp, D>::new();
// let mut scope = self.scope();

assert_eq!(pushers.len(), scope.peers());
// let channel_id = scope.new_identifier();

let receiver = LogPuller::new(puller, scope.index(), channel_id, scope.logging());
// let (pushers, puller) = scope.allocate::<Message<G::Timestamp, D>>(channel_id);
// let (targets, registrar) = Tee::<G::Timestamp, D>::new();

let operator_index = self.allocate_operator_index();
let mut address = self.addr();
address.push(operator_index);
// assert_eq!(pushers.len(), scope.peers());

let operator = BroadcastOperator {
address,
shared_progress: Rc::new(RefCell::new(SharedProgress::new(scope.peers(), 1))),
index: scope.index(),
peers: scope.peers(),
input: PullCounter::new(receiver),
output: PushBuffer::new(PushCounter::new(targets)),
};
// let receiver = LogPuller::new(puller, scope.index(), channel_id, scope.logging());

scope.add_operator_with_index(Box::new(operator), operator_index);
// let operator_index = self.allocate_operator_index();
// let mut address = self.addr();
// address.push(operator_index);

for (i, pusher) in pushers.into_iter().enumerate() {
let sender = LogPusher::new(pusher, scope.index(), i, channel_id, scope.logging());
self.connect_to(Target { index: operator_index, port: i }, sender, channel_id);
}
// let operator = BroadcastOperator {
// address,
// shared_progress: Rc::new(RefCell::new(SharedProgress::new(scope.peers(), 1))),
// index: scope.index(),
// peers: scope.peers(),
// input: PullCounter::new(receiver),
// output: PushBuffer::new(PushCounter::new(targets)),
// };

Stream::new(Source { index: operator_index, port: 0 }, registrar, scope)
}
}
// scope.add_operator_with_index(Box::new(operator), operator_index);

struct BroadcastOperator<T: Timestamp, D: ExchangeData> {
index: usize,
peers: usize,
address: Vec<usize>,
shared_progress: Rc<RefCell<SharedProgress<T>>>,
input: PullCounter<T, D, LogPuller<T, D, Box<Pull<Bundle<T, D>>>>>,
output: PushBuffer<T, D, PushCounter<T, D, Tee<T, D>>>,
}
// for (i, pusher) in pushers.into_iter().enumerate() {
// let sender = LogPusher::new(pusher, scope.index(), i, channel_id, scope.logging());
// self.connect_to(Target { index: operator_index, port: i }, sender, channel_id);
// }

impl<T: Timestamp, D: ExchangeData> Schedule for BroadcastOperator<T, D> {
fn name(&self) -> &str { "Broadcast" }
fn path(&self) -> &[usize] { &self.address[..] }
fn schedule(&mut self) -> bool {

let mut vec = Vec::new();
while let Some(bundle) = self.input.next() {

use communication::message::RefOrMut;

match bundle.as_ref_or_mut() {
RefOrMut::Ref(bundle) => {
RefOrMut::Ref(&bundle.data).swap(&mut vec);
self.output.session(&bundle.time).give_vec(&mut vec);
},
RefOrMut::Mut(bundle) => {
self.output.session(&bundle.time).give_vec(&mut bundle.data);
},
}
}
self.output.cease();

let shared_progress = &mut *self.shared_progress.borrow_mut();
self.input.consumed().borrow_mut().drain_into(&mut shared_progress.consumeds[self.index]);
self.output.inner().produced().borrow_mut().drain_into(&mut shared_progress.produceds[0]);
false
// Stream::new(Source { index: operator_index, port: 0 }, registrar, scope)
}
}

impl<T: Timestamp, D: ExchangeData> Operate<T> for BroadcastOperator<T, D> {
fn inputs(&self) -> usize { self.peers }
fn outputs(&self) -> usize { 1 }

fn get_internal_summary(&mut self) -> (Vec<Vec<Antichain<T::Summary>>>, Rc<RefCell<SharedProgress<T>>>) {
// TODO: (optimization) some of these internal paths do not actually exist
let summary = (0..self.peers).map(|_| vec![Antichain::from_elem(Default::default())]).collect::<Vec<_>>();
(summary, self.shared_progress.clone())
}

fn notify_me(&self) -> bool { false }
}
// struct BroadcastOperator<T: Timestamp, D: ExchangeData> {
// index: usize,
// peers: usize,
// address: Vec<usize>,
// shared_progress: Rc<RefCell<SharedProgress<T>>>,
// input: PullCounter<T, D, LogPuller<T, D, Box<Pull<Bundle<T, D>>>>>,
// output: PushBuffer<T, D, PushCounter<T, D, Tee<T, D>>>,
// }

// impl<T: Timestamp, D: ExchangeData> Schedule for BroadcastOperator<T, D> {
// fn name(&self) -> &str { "Broadcast" }
// fn path(&self) -> &[usize] { &self.address[..] }
// fn schedule(&mut self) -> bool {

// let mut vec = Vec::new();
// while let Some(bundle) = self.input.next() {

// use communication::message::RefOrMut;

// match bundle.as_ref_or_mut() {
// RefOrMut::Ref(bundle) => {
// RefOrMut::Ref(&bundle.data).swap(&mut vec);
// self.output.session(&bundle.time).give_vec(&mut vec);
// },
// RefOrMut::Mut(bundle) => {
// self.output.session(&bundle.time).give_vec(&mut bundle.data);
// },
// }
// }
// self.output.cease();

// let shared_progress = &mut *self.shared_progress.borrow_mut();
// self.input.consumed().borrow_mut().drain_into(&mut shared_progress.consumeds[self.index]);
// self.output.inner().produced().borrow_mut().drain_into(&mut shared_progress.produceds[0]);
// false
// }
// }

// impl<T: Timestamp, D: ExchangeData> Operate<T> for BroadcastOperator<T, D> {
// fn inputs(&self) -> usize { self.peers }
// fn outputs(&self) -> usize { 1 }

// fn get_internal_summary(&mut self) -> (Vec<Vec<Antichain<T::Summary>>>, Rc<RefCell<SharedProgress<T>>>) {
// // TODO: (optimization) some of these internal paths do not actually exist
// let summary = (0..self.peers).map(|_| vec![Antichain::from_elem(Default::default())]).collect::<Vec<_>>();
// (summary, self.shared_progress.clone())
// }

// fn notify_me(&self) -> bool { false }
// }

4 changes: 2 additions & 2 deletions src/dataflow/operators/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ pub use self::inspect::Inspect;
pub use self::filter::Filter;
pub use self::delay::Delay;
pub use self::exchange::Exchange;
// pub use self::broadcast::Broadcast;
pub use self::broadcast::Broadcast;
pub use self::probe::Probe;
pub use self::to_stream::ToStream;
pub use self::capture::Capture;
Expand All @@ -44,7 +44,7 @@ pub mod inspect;
pub mod filter;
pub mod delay;
pub mod exchange;
// pub mod broadcast;
pub mod broadcast;
pub mod probe;
pub mod to_stream;
pub mod capture;
Expand Down

0 comments on commit ca40906

Please sign in to comment.