Skip to content

Commit 0c52513

Browse files
Validate timestamp summary before forming capability (#497)
1 parent 6a73600 commit 0c52513

File tree

3 files changed

+54
-21
lines changed

3 files changed

+54
-21
lines changed

timely/src/dataflow/operators/capability.rs

Lines changed: 25 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,7 @@ use std::cell::RefCell;
2727
use std::fmt::{self, Debug};
2828

2929
use crate::order::PartialOrder;
30+
use crate::progress::Antichain;
3031
use crate::progress::Timestamp;
3132
use crate::progress::ChangeBatch;
3233
use crate::scheduling::Activations;
@@ -223,6 +224,7 @@ impl Display for DowngradeError {
223224

224225
impl Error for DowngradeError {}
225226

227+
/// A shared list of shared output capability buffers.
226228
type CapabilityUpdates<T> = Rc<RefCell<Vec<Rc<RefCell<ChangeBatch<T>>>>>>;
227229

228230
/// An capability of an input port. Holding onto this capability will implicitly holds onto a
@@ -232,25 +234,32 @@ type CapabilityUpdates<T> = Rc<RefCell<Vec<Rc<RefCell<ChangeBatch<T>>>>>>;
232234
/// This input capability supplies a `retain_for_output(self)` method which consumes the input
233235
/// capability and turns it into a [Capability] for a specific output port.
234236
pub struct InputCapability<T: Timestamp> {
237+
/// Output capability buffers, for use in minting capabilities.
235238
internal: CapabilityUpdates<T>,
239+
/// Timestamp summaries for each output.
240+
summaries: Rc<RefCell<Vec<Antichain<T::Summary>>>>,
236241
/// A drop guard that updates the consumed capability this InputCapability refers to on drop
237242
consumed_guard: ConsumedGuard<T>,
238243
}
239244

240245
impl<T: Timestamp> CapabilityTrait<T> for InputCapability<T> {
241246
fn time(&self) -> &T { self.time() }
242247
fn valid_for_output(&self, query_buffer: &Rc<RefCell<ChangeBatch<T>>>) -> bool {
243-
// let borrow = ;
244-
self.internal.borrow().iter().any(|rc| Rc::ptr_eq(rc, query_buffer))
248+
let borrow = self.summaries.borrow();
249+
self.internal.borrow().iter().enumerate().any(|(index, rc)| {
250+
// To be valid, the output buffer must match and the timestamp summary needs to be the default.
251+
Rc::ptr_eq(rc, query_buffer) && borrow[index].len() == 1 && borrow[index][0] == Default::default()
252+
})
245253
}
246254
}
247255

248256
impl<T: Timestamp> InputCapability<T> {
249257
/// Creates a new capability reference at `time` while incrementing (and keeping a reference to)
250258
/// the provided [`ChangeBatch`].
251-
pub(crate) fn new(internal: CapabilityUpdates<T>, guard: ConsumedGuard<T>) -> Self {
259+
pub(crate) fn new(internal: CapabilityUpdates<T>, summaries: Rc<RefCell<Vec<Antichain<T::Summary>>>>, guard: ConsumedGuard<T>) -> Self {
252260
InputCapability {
253261
internal,
262+
summaries,
254263
consumed_guard: guard,
255264
}
256265
}
@@ -270,15 +279,11 @@ impl<T: Timestamp> InputCapability<T> {
270279

271280
/// Delays capability for a specific output port.
272281
pub fn delayed_for_output(&self, new_time: &T, output_port: usize) -> Capability<T> {
273-
// TODO : Test operator summary?
274-
if !self.time().less_equal(new_time) {
275-
panic!("Attempted to delay {:?} to {:?}, which is not beyond the capability's time.", self, new_time);
276-
}
277-
if output_port < self.internal.borrow().len() {
282+
use crate::progress::timestamp::PathSummary;
283+
if self.summaries.borrow()[output_port].iter().flat_map(|summary| summary.results_in(self.time())).any(|time| time.less_equal(new_time)) {
278284
Capability::new(new_time.clone(), self.internal.borrow()[output_port].clone())
279-
}
280-
else {
281-
panic!("Attempted to acquire a capability for a non-existent output port.");
285+
} else {
286+
panic!("Attempted to delay to a time ({:?}) not greater or equal to the operators input-output summary ({:?}) applied to the capabilies time ({:?})", new_time, self.summaries.borrow()[output_port], self.time());
282287
}
283288
}
284289

@@ -287,18 +292,23 @@ impl<T: Timestamp> InputCapability<T> {
287292
/// This method produces an owned capability which must be dropped to release the
288293
/// capability. Users should take care that these capabilities are only stored for
289294
/// as long as they are required, as failing to drop them may result in livelock.
295+
///
296+
/// This method panics if the timestamp summary to output zero strictly advances the time.
290297
pub fn retain(self) -> Capability<T> {
291-
// mint(self.time.clone(), self.internal)
292298
self.retain_for_output(0)
293299
}
294300

295301
/// Transforms to an owned capability for a specific output port.
302+
///
303+
/// This method panics if the timestamp summary to `output_port` strictly advances the time.
296304
pub fn retain_for_output(self, output_port: usize) -> Capability<T> {
297-
if output_port < self.internal.borrow().len() {
298-
Capability::new(self.time().clone(), self.internal.borrow()[output_port].clone())
305+
use crate::progress::timestamp::PathSummary;
306+
let self_time = self.time().clone();
307+
if self.summaries.borrow()[output_port].iter().flat_map(|summary| summary.results_in(&self_time)).any(|time| time.less_equal(&self_time)) {
308+
Capability::new(self_time, self.internal.borrow()[output_port].clone())
299309
}
300310
else {
301-
panic!("Attempted to acquire a capability for a non-existent output port.");
311+
panic!("Attempted to retain a time ({:?}) not greater or equal to the operators input-output summary ({:?}) applied to the capabilies time ({:?})", self_time, self.summaries.borrow()[output_port], self_time);
302312
}
303313
}
304314
}

timely/src/dataflow/operators/generic/builder_rc.rs

Lines changed: 13 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,8 @@ pub struct OperatorBuilder<G: Scope> {
3131
frontier: Vec<MutableAntichain<G::Timestamp>>,
3232
consumed: Vec<Rc<RefCell<ChangeBatch<G::Timestamp>>>>,
3333
internal: Rc<RefCell<Vec<Rc<RefCell<ChangeBatch<G::Timestamp>>>>>>,
34+
/// For each input, a shared list of summaries to each output.
35+
summaries: Vec<Rc<RefCell<Vec<Antichain<<G::Timestamp as Timestamp>::Summary>>>>>,
3436
produced: Vec<Rc<RefCell<ChangeBatch<G::Timestamp>>>>,
3537
logging: Option<Logger>,
3638
}
@@ -45,6 +47,7 @@ impl<G: Scope> OperatorBuilder<G> {
4547
frontier: Vec::new(),
4648
consumed: Vec::new(),
4749
internal: Rc::new(RefCell::new(Vec::new())),
50+
summaries: Vec::new(),
4851
produced: Vec::new(),
4952
logging,
5053
}
@@ -76,13 +79,16 @@ impl<G: Scope> OperatorBuilder<G> {
7679
where
7780
P: ParallelizationContractCore<G::Timestamp, D> {
7881

79-
let puller = self.builder.new_input_connection(stream, pact, connection);
82+
let puller = self.builder.new_input_connection(stream, pact, connection.clone());
8083

8184
let input = PullCounter::new(puller);
8285
self.frontier.push(MutableAntichain::new());
8386
self.consumed.push(input.consumed().clone());
8487

85-
new_input_handle(input, self.internal.clone(), self.logging.clone())
88+
let shared_summary = Rc::new(RefCell::new(connection));
89+
self.summaries.push(shared_summary.clone());
90+
91+
new_input_handle(input, self.internal.clone(), shared_summary, self.logging.clone())
8692
}
8793

8894
/// Adds a new output to a generic operator builder, returning the `Push` implementor to use.
@@ -101,14 +107,18 @@ impl<G: Scope> OperatorBuilder<G> {
101107
/// antichain indicating that there is no connection from the input to the output.
102108
pub fn new_output_connection<D: Container>(&mut self, connection: Vec<Antichain<<G::Timestamp as Timestamp>::Summary>>) -> (OutputWrapper<G::Timestamp, D, TeeCore<G::Timestamp, D>>, StreamCore<G, D>) {
103109

104-
let (tee, stream) = self.builder.new_output_connection(connection);
110+
let (tee, stream) = self.builder.new_output_connection(connection.clone());
105111

106112
let internal = Rc::new(RefCell::new(ChangeBatch::new()));
107113
self.internal.borrow_mut().push(internal.clone());
108114

109115
let mut buffer = PushBuffer::new(PushCounter::new(tee));
110116
self.produced.push(buffer.inner().produced().clone());
111117

118+
for (summary, connection) in self.summaries.iter().zip(connection.into_iter()) {
119+
summary.borrow_mut().push(connection.clone());
120+
}
121+
112122
(OutputWrapper::new(buffer, internal), stream)
113123
}
114124

timely/src/dataflow/operators/generic/handles.rs

Lines changed: 16 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@
66
use std::rc::Rc;
77
use std::cell::RefCell;
88

9+
use crate::progress::Antichain;
910
use crate::progress::Timestamp;
1011
use crate::progress::ChangeBatch;
1112
use crate::progress::frontier::MutableAntichain;
@@ -24,6 +25,11 @@ use crate::dataflow::operators::capability::CapabilityTrait;
2425
pub struct InputHandleCore<T: Timestamp, D: Container, P: Pull<BundleCore<T, D>>> {
2526
pull_counter: PullCounter<T, D, P>,
2627
internal: Rc<RefCell<Vec<Rc<RefCell<ChangeBatch<T>>>>>>,
28+
/// Timestamp summaries from this input to each output.
29+
///
30+
/// Each timestamp received through this input may only produce output timestamps
31+
/// greater or equal to the input timestamp subjected to at least one of these summaries.
32+
summaries: Rc<RefCell<Vec<Antichain<T::Summary>>>>,
2733
logging: Option<Logger>,
2834
}
2935

@@ -49,13 +55,14 @@ impl<'a, T: Timestamp, D: Container, P: Pull<BundleCore<T, D>>> InputHandleCore<
4955
#[inline]
5056
pub fn next(&mut self) -> Option<(InputCapability<T>, RefOrMut<D>)> {
5157
let internal = &self.internal;
58+
let summaries = &self.summaries;
5259
self.pull_counter.next_guarded().map(|(guard, bundle)| {
5360
match bundle.as_ref_or_mut() {
5461
RefOrMut::Ref(bundle) => {
55-
(InputCapability::new(internal.clone(), guard), RefOrMut::Ref(&bundle.data))
62+
(InputCapability::new(internal.clone(), summaries.clone(), guard), RefOrMut::Ref(&bundle.data))
5663
},
5764
RefOrMut::Mut(bundle) => {
58-
(InputCapability::new(internal.clone(), guard), RefOrMut::Mut(&mut bundle.data))
65+
(InputCapability::new(internal.clone(), summaries.clone(), guard), RefOrMut::Mut(&mut bundle.data))
5966
},
6067
}
6168
})
@@ -145,10 +152,16 @@ pub fn _access_pull_counter<T: Timestamp, D: Container, P: Pull<BundleCore<T, D>
145152

146153
/// Constructs an input handle.
147154
/// Declared separately so that it can be kept private when `InputHandle` is re-exported.
148-
pub fn new_input_handle<T: Timestamp, D: Container, P: Pull<BundleCore<T, D>>>(pull_counter: PullCounter<T, D, P>, internal: Rc<RefCell<Vec<Rc<RefCell<ChangeBatch<T>>>>>>, logging: Option<Logger>) -> InputHandleCore<T, D, P> {
155+
pub fn new_input_handle<T: Timestamp, D: Container, P: Pull<BundleCore<T, D>>>(
156+
pull_counter: PullCounter<T, D, P>,
157+
internal: Rc<RefCell<Vec<Rc<RefCell<ChangeBatch<T>>>>>>,
158+
summaries: Rc<RefCell<Vec<Antichain<T::Summary>>>>,
159+
logging: Option<Logger>
160+
) -> InputHandleCore<T, D, P> {
149161
InputHandleCore {
150162
pull_counter,
151163
internal,
164+
summaries,
152165
logging,
153166
}
154167
}

0 commit comments

Comments
 (0)