Skip to content

Commit

Permalink
Move capture/ to core::capture/ (#557)
Browse files Browse the repository at this point in the history
* Remove non-Core capture logic

* Update Capture

* comment/doctest clean-up
  • Loading branch information
frankmcsherry authored Mar 23, 2024
1 parent 52e9245 commit fc50754
Show file tree
Hide file tree
Showing 14 changed files with 164 additions and 242 deletions.
2 changes: 1 addition & 1 deletion timely/examples/capture_recv.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ fn main() {
.collect::<Vec<_>>()
.into_iter()
.map(|l| l.incoming().next().unwrap().unwrap())
.map(|r| EventReader::<_,u64,_>::new(r))
.map(|r| EventReader::<_,Vec<u64>,_>::new(r))
.collect::<Vec<_>>();

worker.dataflow::<u64,_,_>(|scope| {
Expand Down
2 changes: 1 addition & 1 deletion timely/examples/logging-recv.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ fn main() {
.collect::<Vec<_>>()
.into_iter()
.map(|l| l.incoming().next().unwrap().unwrap())
.map(|r| EventReader::<Duration,(Duration,TimelySetup,TimelyEvent),_>::new(r))
.map(|r| EventReader::<Duration,Vec<(Duration,TimelySetup,TimelyEvent)>,_>::new(r))
.collect::<Vec<_>>();

worker.dataflow(|scope| {
Expand Down
131 changes: 0 additions & 131 deletions timely/src/dataflow/operators/capture/extract.rs

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ use crate::Container;
use crate::progress::ChangeBatch;
use crate::progress::Timestamp;

use super::{EventCore, EventPusherCore};
use super::{Event, EventPusher};

/// Capture a stream of timestamped data for later replay.
pub trait Capture<T: Timestamp, C: Container> {
Expand All @@ -30,7 +30,7 @@ pub trait Capture<T: Timestamp, C: Container> {
/// use std::sync::{Arc, Mutex};
/// use timely::dataflow::Scope;
/// use timely::dataflow::operators::{Capture, ToStream, Inspect};
/// use timely::dataflow::operators::capture::{EventLinkCore, Replay, Extract};
/// use timely::dataflow::operators::capture::{EventLink, Replay, Extract};
///
/// // get send and recv endpoints, wrap send to share
/// let (send, recv) = ::std::sync::mpsc::channel();
Expand All @@ -42,7 +42,7 @@ pub trait Capture<T: Timestamp, C: Container> {
/// let send = send.lock().unwrap().clone();
///
/// // these are to capture/replay the stream.
/// let handle1 = Rc::new(EventLinkCore::new());
/// let handle1 = Rc::new(EventLink::new());
/// let handle2 = Some(handle1.clone());
///
/// worker.dataflow::<u64,_,_>(|scope1|
Expand Down Expand Up @@ -95,26 +95,26 @@ pub trait Capture<T: Timestamp, C: Container> {
/// );
///
/// worker.dataflow::<u64,_,_>(|scope2| {
/// Some(EventReader::<_,u64,_>::new(recv))
/// Some(EventReader::<_,Vec<u64>,_>::new(recv))
/// .replay_into(scope2)
/// .capture_into(send0)
/// });
/// }).unwrap();
///
/// assert_eq!(recv0.extract()[0].1, (0..10).collect::<Vec<_>>());
/// ```
fn capture_into<P: EventPusherCore<T, C>+'static>(&self, pusher: P);
fn capture_into<P: EventPusher<T, C>+'static>(&self, pusher: P);

/// Captures a stream using Rust's MPSC channels.
fn capture(&self) -> ::std::sync::mpsc::Receiver<EventCore<T, C>> {
fn capture(&self) -> ::std::sync::mpsc::Receiver<Event<T, C>> {
let (send, recv) = ::std::sync::mpsc::channel();
self.capture_into(send);
recv
}
}

impl<S: Scope, C: Container> Capture<S::Timestamp, C> for StreamCore<S, C> {
fn capture_into<P: EventPusherCore<S::Timestamp, C>+'static>(&self, mut event_pusher: P) {
fn capture_into<P: EventPusher<S::Timestamp, C>+'static>(&self, mut event_pusher: P) {

let mut builder = OperatorBuilder::new("Capture".to_owned(), self.scope());
let mut input = PullCounter::new(builder.new_input(self, Pipeline));
Expand All @@ -131,7 +131,7 @@ impl<S: Scope, C: Container> Capture<S::Timestamp, C> for StreamCore<S, C> {
if !progress.frontiers[0].is_empty() {
// transmit any frontier progress.
let to_send = ::std::mem::replace(&mut progress.frontiers[0], ChangeBatch::new());
event_pusher.push(EventCore::Progress(to_send.into_inner()));
event_pusher.push(Event::Progress(to_send.into_inner()));
}

use crate::communication::message::RefOrMut;
Expand All @@ -143,7 +143,7 @@ impl<S: Scope, C: Container> Capture<S::Timestamp, C> for StreamCore<S, C> {
RefOrMut::Mut(reference) => (&reference.time, RefOrMut::Mut(&mut reference.data)),
};
let vector = data.replace(Default::default());
event_pusher.push(EventCore::Messages(time.clone(), vector));
event_pusher.push(Event::Messages(time.clone(), vector));
}
input.consumed().borrow_mut().drain_into(&mut progress.consumeds[0]);
false
Expand Down
Loading

0 comments on commit fc50754

Please sign in to comment.