Skip to content

Commit

Permalink
Cursor based on trace's upper
Browse files Browse the repository at this point in the history
Fixes a bug where we'd obtain a trace's cursor based on a batch's lower
frontier. The trace can be compacted past the batch's lower frontier,
which violates `curso_through`'s assumptions. Instead, we use the trace's
upper to obtain the cursor.

Fixes #526.

Signed-off-by: Moritz Hoffmann <antiguru@gmail.com>
  • Loading branch information
antiguru committed Oct 18, 2024
1 parent b5046c8 commit 2865900
Show file tree
Hide file tree
Showing 2 changed files with 92 additions and 5 deletions.
48 changes: 46 additions & 2 deletions src/operators/count.rs
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,8 @@ where

self.stream.unary_frontier(Pipeline, "CountTotal", move |_,_| {

// tracks the upper limit of known-complete timestamps.
// tracks the lower and upper limits of known-complete timestamps.
let mut lower_limit = timely::progress::frontier::Antichain::from_elem(<G::Timestamp as timely::progress::Timestamp>::minimum());
let mut upper_limit = timely::progress::frontier::Antichain::from_elem(<G::Timestamp as timely::progress::Timestamp>::minimum());

move |input, output| {
Expand All @@ -80,7 +81,8 @@ where
let mut session = output.session(&capability);
for batch in buffer.drain(..) {
let mut batch_cursor = batch.cursor();
let (mut trace_cursor, trace_storage) = trace.cursor_through(batch.lower().borrow()).unwrap();
trace.advance_upper(&mut lower_limit);
let (mut trace_cursor, trace_storage) = trace.cursor_through(lower_limit.borrow()).unwrap();
upper_limit.clone_from(batch.upper());

while let Some(key) = batch_cursor.get_key(&batch) {
Expand Down Expand Up @@ -124,3 +126,45 @@ where
.as_collection()
}
}

#[cfg(test)]
mod tests {
use timely::dataflow::ProbeHandle;
use timely::dataflow::operators::Probe;
use timely::progress::frontier::AntichainRef;

use crate::input::Input;
use crate::operators::CountTotal;
use crate::operators::arrange::ArrangeBySelf;
use crate::trace::TraceReader;

#[test]
fn test_count_total() {
timely::execute_directly(move |worker| {
let mut probe = ProbeHandle::new();
let (mut input, mut trace) = worker.dataflow::<u32, _, _>(|scope| {
let (handle, input) = scope.new_collection();
let arrange = input.arrange_by_self();
arrange.stream.probe_with(&mut probe);
(handle, arrange.trace)
});

// ingest some batches
for _ in 0..10 {
input.insert(10);
input.advance_to(input.time() + 1);
input.flush();
worker.step_while(|| probe.less_than(input.time()));
}

// advance the trace
trace.set_physical_compaction(AntichainRef::new(&[2]));
trace.set_logical_compaction(AntichainRef::new(&[2]));

worker.dataflow::<u32, _, _>(|scope| {
let arrange = trace.import(scope);
arrange.count_total();
});
});
}
}
49 changes: 46 additions & 3 deletions src/operators/threshold.rs
Original file line number Diff line number Diff line change
Expand Up @@ -117,7 +117,8 @@ where

self.stream.unary_frontier(Pipeline, "ThresholdTotal", move |_,_| {

// tracks the upper limit of known-complete timestamps.
// tracks the lower and upper limits of known-complete timestamps.
let mut lower_limit = timely::progress::frontier::Antichain::from_elem(<G::Timestamp as timely::progress::Timestamp>::minimum());
let mut upper_limit = timely::progress::frontier::Antichain::from_elem(<G::Timestamp as timely::progress::Timestamp>::minimum());

move |input, output| {
Expand All @@ -126,9 +127,9 @@ where
batches.swap(&mut buffer);
let mut session = output.session(&capability);
for batch in buffer.drain(..) {

trace.advance_upper(&mut lower_limit);
let mut batch_cursor = batch.cursor();
let (mut trace_cursor, trace_storage) = trace.cursor_through(batch.lower().borrow()).unwrap();
let (mut trace_cursor, trace_storage) = trace.cursor_through(lower_limit.borrow()).unwrap();

upper_limit.clone_from(batch.upper());

Expand Down Expand Up @@ -187,3 +188,45 @@ where
.as_collection()
}
}

#[cfg(test)]
mod tests {
use timely::dataflow::ProbeHandle;
use timely::dataflow::operators::Probe;
use timely::progress::frontier::AntichainRef;

use crate::input::Input;
use crate::operators::ThresholdTotal;
use crate::operators::arrange::ArrangeBySelf;
use crate::trace::TraceReader;

#[test]
fn test_threshold_total() {
timely::execute_directly(move |worker| {
let mut probe = ProbeHandle::new();
let (mut input, mut trace) = worker.dataflow::<u32, _, _>(|scope| {
let (handle, input) = scope.new_collection();
let arrange = input.arrange_by_self();
arrange.stream.probe_with(&mut probe);
(handle, arrange.trace)
});

// ingest some batches
for _ in 0..10 {
input.insert(10);
input.advance_to(input.time() + 1);
input.flush();
worker.step_while(|| probe.less_than(input.time()));
}

// advance the trace
trace.set_physical_compaction(AntichainRef::new(&[2]));
trace.set_logical_compaction(AntichainRef::new(&[2]));

worker.dataflow::<u32, _, _>(|scope| {
let arrange = trace.import(scope);
arrange.threshold_total(|_,c| c % 2);
});
});
}
}

0 comments on commit 2865900

Please sign in to comment.