Skip to content

Commit 1cf7b4a

Browse files
Activate operators that may want to shut down (#488)
1 parent faf5eb6 commit 1cf7b4a

File tree

1 file changed

+15
-0
lines changed

1 file changed

+15
-0
lines changed

timely/src/progress/subgraph.rs

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -200,6 +200,7 @@ where
200200
incomplete_count,
201201
activations,
202202
temp_active: BinaryHeap::new(),
203+
maybe_shutdown: Vec::new(),
203204
children: self.children,
204205
input_messages: self.input_messages,
205206
output_capabilities: self.output_capabilities,
@@ -242,6 +243,7 @@ where
242243
// shared activations (including children).
243244
activations: Rc<RefCell<Activations>>,
244245
temp_active: BinaryHeap<Reverse<usize>>,
246+
maybe_shutdown: Vec<usize>,
245247

246248
// shared state written to by the datapath, counting records entering this subgraph instance.
247249
input_messages: Vec<Rc<RefCell<ChangeBatch<TInner>>>>,
@@ -461,6 +463,7 @@ where
461463

462464
// Drain propagated information into shared progress structure.
463465
for ((location, time), diff) in self.pointstamp_tracker.pushed().drain() {
466+
self.maybe_shutdown.push(location.node);
464467
// Targets are actionable, sources are not.
465468
if let crate::progress::Port::Target(port) = location.port {
466469
if self.children[location.node].notify {
@@ -477,6 +480,18 @@ where
477480
}
478481
}
479482

483+
// Consider scheduling each recipient of progress information to shut down.
484+
self.maybe_shutdown.sort();
485+
self.maybe_shutdown.dedup();
486+
for child_index in self.maybe_shutdown.drain(..) {
487+
let child_state = self.pointstamp_tracker.node_state(child_index);
488+
let frontiers_empty = child_state.targets.iter().all(|x| x.implications.is_empty());
489+
let no_capabilities = child_state.sources.iter().all(|x| x.pointstamps.is_empty());
490+
if frontiers_empty && no_capabilities {
491+
self.temp_active.push(Reverse(child_index));
492+
}
493+
}
494+
480495
// Extract child zero frontier changes and report as internal capability changes.
481496
for (output, internal) in self.shared_progress.borrow_mut().internals.iter_mut().enumerate() {
482497
self.pointstamp_tracker

0 commit comments

Comments
 (0)