Skip to content

Commit 5b808ff

Browse files
Back Mutableantichain by ChangeBatch (#505)
1 parent 592914b commit 5b808ff

File tree

1 file changed

+37
-73
lines changed

1 file changed

+37
-73
lines changed

timely/src/progress/frontier.rs

Lines changed: 37 additions & 73 deletions
Original file line numberDiff line numberDiff line change
@@ -308,14 +308,9 @@ impl<T> ::std::iter::IntoIterator for Antichain<T> {
308308
/// The `MutableAntichain` implementation is done with the intent that updates to it are done in batches,
309309
/// and it is acceptable to rebuild the frontier from scratch when a batch of updates change it. This means
310310
/// that it can be expensive to maintain a large number of counts and change few elements near the frontier.
311-
///
312-
/// There is an `update_dirty` method for single updates that leave the `MutableAntichain` in a dirty state,
313-
/// but I strongly recommend against using them unless you must (on part of timely progress tracking seems
314-
/// to be greatly simplified by access to this)
315311
#[derive(Clone, Debug, Abomonation, Serialize, Deserialize)]
316312
pub struct MutableAntichain<T> {
317-
dirty: usize,
318-
updates: Vec<(T, i64)>,
313+
updates: ChangeBatch<T>,
319314
frontier: Vec<T>,
320315
changes: ChangeBatch<T>,
321316
}
@@ -334,8 +329,7 @@ impl<T> MutableAntichain<T> {
334329
#[inline]
335330
pub fn new() -> MutableAntichain<T> {
336331
MutableAntichain {
337-
dirty: 0,
338-
updates: Vec::new(),
332+
updates: ChangeBatch::new(),
339333
frontier: Vec::new(),
340334
changes: ChangeBatch::new(),
341335
}
@@ -354,21 +348,11 @@ impl<T> MutableAntichain<T> {
354348
///```
355349
#[inline]
356350
pub fn clear(&mut self) {
357-
self.dirty = 0;
358351
self.updates.clear();
359352
self.frontier.clear();
360353
self.changes.clear();
361354
}
362355

363-
/// This method deletes the contents. Unlike `clear` it records doing so.
364-
pub fn empty(&mut self) {
365-
for (_, diff) in self.updates.iter_mut() {
366-
*diff = 0;
367-
}
368-
369-
self.dirty = self.updates.len();
370-
}
371-
372356
/// Reveals the minimal elements with positive count.
373357
///
374358
/// # Examples
@@ -381,7 +365,6 @@ impl<T> MutableAntichain<T> {
381365
///```
382366
#[inline]
383367
pub fn frontier(&self) -> AntichainRef<'_, T> {
384-
debug_assert_eq!(self.dirty, 0);
385368
AntichainRef::new(&self.frontier)
386369
}
387370

@@ -396,13 +379,12 @@ impl<T> MutableAntichain<T> {
396379
/// assert!(frontier.frontier() == AntichainRef::new(&[0u64]));
397380
///```
398381
#[inline]
399-
pub fn new_bottom(bottom: T) -> MutableAntichain<T>
382+
pub fn new_bottom(bottom: T) -> MutableAntichain<T>
400383
where
401-
T: Clone,
384+
T: Ord+Clone,
402385
{
403386
MutableAntichain {
404-
dirty: 0,
405-
updates: vec![(bottom.clone(), 1)],
387+
updates: ChangeBatch::new_from(bottom.clone(), 1),
406388
frontier: vec![bottom],
407389
changes: ChangeBatch::new(),
408390
}
@@ -420,7 +402,6 @@ impl<T> MutableAntichain<T> {
420402
///```
421403
#[inline]
422404
pub fn is_empty(&self) -> bool {
423-
debug_assert_eq!(self.dirty, 0);
424405
self.frontier.is_empty()
425406
}
426407

@@ -441,7 +422,6 @@ impl<T> MutableAntichain<T> {
441422
where
442423
T: PartialOrder,
443424
{
444-
debug_assert_eq!(self.dirty, 0);
445425
self.frontier().less_than(time)
446426
}
447427

@@ -462,22 +442,9 @@ impl<T> MutableAntichain<T> {
462442
where
463443
T: PartialOrder,
464444
{
465-
debug_assert_eq!(self.dirty, 0);
466445
self.frontier().less_equal(time)
467446
}
468447

469-
/// Allows a single-element push, but dirties the antichain and prevents inspection until cleaned.
470-
///
471-
/// At the moment inspection is prevented via panic, so best be careful (this should probably be fixed).
472-
/// It is *very* important if you want to use this method that very soon afterwards you call something
473-
/// akin to `update_iter`, perhaps with a `None` argument if you have no more data, as this method will
474-
/// tidy up the internal representation.
475-
#[inline]
476-
pub fn update_dirty(&mut self, time: T, delta: i64) {
477-
self.updates.push((time, delta));
478-
self.dirty += 1;
479-
}
480-
481448
/// Applies updates to the antichain and enumerates any changes.
482449
///
483450
/// # Examples
@@ -502,40 +469,28 @@ impl<T> MutableAntichain<T> {
502469
{
503470
let updates = updates.into_iter();
504471

505-
// Attempt to pre-allocate for the new updates
506-
let (min, max) = updates.size_hint();
507-
self.updates.reserve(max.unwrap_or(min));
508-
509-
for (time, delta) in updates {
510-
self.updates.push((time, delta));
511-
self.dirty += 1;
512-
}
513-
514472
// track whether a rebuild is needed.
515473
let mut rebuild_required = false;
474+
for (time, delta) in updates {
516475

517-
// determine if recently pushed data requires rebuilding the frontier.
518-
// note: this may be required even with an empty iterator, due to dirty data in self.updates.
519-
while self.dirty > 0 && !rebuild_required {
520-
521-
let time = &self.updates[self.updates.len() - self.dirty].0;
522-
let delta = self.updates[self.updates.len() - self.dirty].1;
523-
524-
let beyond_frontier = self.frontier.iter().any(|f| f.less_than(time));
525-
let before_frontier = !self.frontier.iter().any(|f| f.less_equal(time));
526-
rebuild_required = rebuild_required || !(beyond_frontier || (delta < 0 && before_frontier));
476+
// If we do not yet require a rebuild, test whether we might require one
477+
// and set the flag in that case.
478+
if !rebuild_required {
479+
let beyond_frontier = self.frontier.iter().any(|f| f.less_than(&time));
480+
let before_frontier = !self.frontier.iter().any(|f| f.less_equal(&time));
481+
rebuild_required = !(beyond_frontier || (delta < 0 && before_frontier));
482+
}
527483

528-
self.dirty -= 1;
484+
self.updates.update(time, delta);
529485
}
530-
self.dirty = 0;
531486

532487
if rebuild_required {
533488
self.rebuild()
534489
}
535490
self.changes.drain()
536491
}
537492

538-
/// Sorts and consolidates `self.updates` and applies `action` to any frontier changes.
493+
/// Rebuilds `self.frontier` from `self.updates`.
539494
///
540495
/// This method is meant to be used for bulk updates to the frontier, and does more work than one might do
541496
/// for single updates, but is meant to be an efficient way to process multiple updates together. This is
@@ -544,19 +499,6 @@ impl<T> MutableAntichain<T> {
544499
where
545500
T: Clone + PartialOrder + Ord,
546501
{
547-
548-
// sort and consolidate updates; retain non-zero accumulations.
549-
if !self.updates.is_empty() {
550-
self.updates.sort_by(|x,y| x.0.cmp(&y.0));
551-
for i in 0 .. self.updates.len() - 1 {
552-
if self.updates[i].0 == self.updates[i+1].0 {
553-
self.updates[i+1].1 += self.updates[i].1;
554-
self.updates[i].1 = 0;
555-
}
556-
}
557-
self.updates.retain(|x| x.1 != 0);
558-
}
559-
560502
for time in self.frontier.drain(..) {
561503
self.changes.update(time, -1);
562504
}
@@ -580,6 +522,7 @@ impl<T> MutableAntichain<T> {
580522
T: Ord,
581523
{
582524
self.updates
525+
.unstable_internal_updates()
583526
.iter()
584527
.filter(|td| td.0.eq(query_time))
585528
.map(|td| td.1)
@@ -804,4 +747,25 @@ mod tests {
804747
assert!(!hashed.contains(&Antichain::from(vec![Elem('c', 3)])));
805748
assert!(!hashed.contains(&Antichain::from(vec![])));
806749
}
750+
751+
#[test]
752+
fn mutable_compaction() {
753+
let mut mutable = MutableAntichain::new();
754+
mutable.update_iter(Some((7, 1)));
755+
mutable.update_iter(Some((7, 1)));
756+
mutable.update_iter(Some((7, 1)));
757+
mutable.update_iter(Some((7, 1)));
758+
mutable.update_iter(Some((7, 1)));
759+
mutable.update_iter(Some((7, 1)));
760+
mutable.update_iter(Some((8, 1)));
761+
mutable.update_iter(Some((8, 1)));
762+
mutable.update_iter(Some((8, 1)));
763+
mutable.update_iter(Some((8, 1)));
764+
mutable.update_iter(Some((8, 1)));
765+
for _ in 0 .. 1000 {
766+
mutable.update_iter(Some((9, 1)));
767+
mutable.update_iter(Some((9, -1)));
768+
}
769+
assert!(mutable.updates.unstable_internal_updates().len() <= 32);
770+
}
807771
}

0 commit comments

Comments
 (0)