Skip to content

Commit

Permalink
Timeless Attributes (#67)
Browse files Browse the repository at this point in the history
Allows attributes to indicate that they do not care about time. Such attributes want new inputs to be immediately visible to the entire system. The tricky part is not having such attributes stall everyone else (e.g. because they keep a capability for t0) or prevent compaction (again by keeping a capability for event t0 in bitemporal mode).

* Rename Domain::input_probe -> domain_probe
* Extend `AttributeConfig` to specify timelessness
* Add domain_probe to SourcingContext
  • Loading branch information
David Bach authored and comnik committed Apr 27, 2019
1 parent 2b73bae commit 93f4c81
Show file tree
Hide file tree
Showing 7 changed files with 34 additions and 15 deletions.
23 changes: 16 additions & 7 deletions src/domain/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ pub struct Domain<T: Timestamp + Lattice> {
/// Input handles to attributes in this domain.
input_sessions: HashMap<String, UnorderedSession<T, (Value, Value), isize>>,
/// The probe keeping track of source progress in this domain.
input_probe: ProbeHandle<T>,
domain_probe: ProbeHandle<T>,
/// Configurations for attributes in this domain.
pub attributes: HashMap<Aid, AttributeConfig>,
/// Forward attribute indices eid -> v.
Expand All @@ -50,7 +50,7 @@ where
Domain {
now_at: start_at,
input_sessions: HashMap::new(),
input_probe: ProbeHandle::new(),
domain_probe: ProbeHandle::new(),
attributes: HashMap::new(),
forward: HashMap::new(),
reverse: HashMap::new(),
Expand Down Expand Up @@ -132,9 +132,18 @@ where
) -> Result<(), Error> {
// We need to install a probe on source-fed attributes in
// order to determine their progress.
let probed_pairs = pairs.probe_with(&mut self.input_probe);

self.create_attribute(name, config, &probed_pairs)?;
// We do not want to probe timeless attributes.
// Sources of timeless attributes either are not able to or do not
// want to provide valid domain timestamps.
// Forcing to probe them would stall progress in the system.
let source_pairs = if config.timeless {
pairs.to_owned()
} else {
pairs.probe_with(&mut self.domain_probe)
};

self.create_attribute(name, config, &source_pairs)?;

Ok(())
}
Expand Down Expand Up @@ -243,7 +252,7 @@ where
/// Advances all handles of the domain to its current frontier.
pub fn advance_domain_to_source(&mut self) -> Result<(), Error> {
let frontier = self
.input_probe
.domain_probe
.with_frontier(|frontier| (*frontier).to_vec());
self.advance_by(&frontier)
}
Expand Down Expand Up @@ -320,7 +329,7 @@ where
}

/// Returns a handle to the domain's input probe.
pub fn input_probe(&self) -> &ProbeHandle<T> {
&self.input_probe
pub fn domain_probe(&self) -> &ProbeHandle<T> {
&self.domain_probe
}
}
7 changes: 7 additions & 0 deletions src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -303,6 +303,10 @@ pub struct AttributeConfig {
/// How close indexed traces should follow the computation
/// frontier.
pub trace_slack: Option<Time>,
/// Does this attribute care about its respective time
/// dimension? Timeless attributes do not have an
/// influence on the overall progress in the system.
pub timeless: bool,
}

impl AttributeConfig {
Expand All @@ -317,6 +321,7 @@ impl AttributeConfig {
// s.t. traces advance to t+1 when we're still accepting
// inputs for t+1.
trace_slack: Some(Time::TxId(1)),
timeless: false,
}
}

Expand All @@ -327,6 +332,7 @@ impl AttributeConfig {
AttributeConfig {
input_semantics,
trace_slack: Some(Time::Real(Duration::from_secs(0))),
timeless: false,
}
}

Expand All @@ -336,6 +342,7 @@ impl AttributeConfig {
AttributeConfig {
input_semantics,
trace_slack: None,
timeless: false,
}
}
}
Expand Down
1 change: 1 addition & 0 deletions src/server/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -375,6 +375,7 @@ where
let context = SourcingContext {
t0: self.t0,
scheduler: Rc::downgrade(&self.scheduler),
domain_probe: self.context.internal.domain_probe().clone(),
timely_events: self.timely_events.clone().unwrap(),
differential_events: self.differential_events.clone().unwrap(),
};
Expand Down
2 changes: 1 addition & 1 deletion src/sources/csv_file.rs
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ impl<S: Scope<Timestamp = Duration>> Sourceable<S> for CsvFile {
fn source(
&self,
scope: &mut S,
context: SourcingContext,
context: SourcingContext<S::Timestamp>,
) -> Vec<(
Aid,
AttributeConfig,
Expand Down
2 changes: 1 addition & 1 deletion src/sources/differential_logging.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ impl<S: Scope<Timestamp = Duration>> Sourceable<S> for DifferentialLogging {
fn source(
&self,
scope: &mut S,
context: SourcingContext,
context: SourcingContext<S::Timestamp>,
) -> Vec<(
Aid,
AttributeConfig,
Expand Down
12 changes: 7 additions & 5 deletions src/sources/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ use std::rc::{Rc, Weak};
use std::time::{Duration, Instant};

use timely::dataflow::operators::capture::event::link::EventLink;
use timely::dataflow::{Scope, Stream};
use timely::dataflow::{ProbeHandle, Scope, Stream};
use timely::logging::TimelyEvent;
use timely::progress::Timestamp;

Expand All @@ -28,10 +28,12 @@ pub use self::csv_file::CsvFile;
// pub use self::json_file::JsonFile;

/// A struct encapsulating any state required to create sources.
pub struct SourcingContext {
pub struct SourcingContext<T: Timestamp> {
/// The logical start of the computation, used by sources to
/// compute their relative progress.
pub t0: Instant,
/// A handle to the timely probe of the domain this source is created in.
pub domain_probe: ProbeHandle<T>,
/// A weak handle to a scheduler, used by sources to defer their
/// next activation when polling.
pub scheduler: Weak<RefCell<Scheduler>>,
Expand All @@ -52,7 +54,7 @@ where
fn source(
&self,
scope: &mut S,
context: SourcingContext,
context: SourcingContext<S::Timestamp>,
) -> Vec<(
Aid,
AttributeConfig,
Expand Down Expand Up @@ -81,7 +83,7 @@ impl<S: Scope<Timestamp = Duration>> Sourceable<S> for Source {
fn source(
&self,
scope: &mut S,
context: SourcingContext,
context: SourcingContext<S::Timestamp>,
) -> Vec<(
Aid,
AttributeConfig,
Expand All @@ -103,7 +105,7 @@ impl<S: Scope<Timestamp = u64>> Sourceable<S> for Source {
fn source(
&self,
_scope: &mut S,
_context: SourcingContext,
_context: SourcingContext<S::Timestamp>,
) -> Vec<(
Aid,
AttributeConfig,
Expand Down
2 changes: 1 addition & 1 deletion src/sources/timely_logging.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ impl<S: Scope<Timestamp = Duration>> Sourceable<S> for TimelyLogging {
fn source(
&self,
scope: &mut S,
context: SourcingContext,
context: SourcingContext<S::Timestamp>,
) -> Vec<(
Aid,
AttributeConfig,
Expand Down

0 comments on commit 93f4c81

Please sign in to comment.