Skip to content

Commit

Permalink
Add a test for double counting orphans
Browse files Browse the repository at this point in the history
There are cases where a block is marked as orphaned by multiple blocks and double counted. This leads to strange behavior due to miscalculation of the number of blocks available for processing
  • Loading branch information
pattonw committed Jul 10, 2024
1 parent 84cf215 commit 74e88ef
Show file tree
Hide file tree
Showing 2 changed files with 31 additions and 7 deletions.
10 changes: 3 additions & 7 deletions daisy/ready_surface.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ class ReadySurface:
2) it has downstream dependencies marked as OTHER
A node is a BOUNDARY node iff
1) it has been marked as failed
2) It has upstream dependencies marked as SURFACE
2) it has upstream dependencies marked as SURFACE
"""

def __init__(self, get_downstream_nodes, get_upstream_nodes):
Expand Down Expand Up @@ -103,19 +103,15 @@ def mark_failure(self, node, count_all_orphans=False):
# recurse through downstream nodes, adding them to boundary if
# necessary
down_nodes = set(self.downstream(node))
orphans = set(down_nodes)
orphans = set()
while len(down_nodes) > 0:
down_node = down_nodes.pop()
if self.__add_to_boundary(down_node):
# check if any nodes downstream of this node are also boundary
# nodes.
new_nodes = set(self.downstream(down_node)) - orphans
down_nodes = down_nodes.union(new_nodes)
orphans = orphans.union(new_nodes)
elif count_all_orphans:
new_nodes = set(self.downstream(down_node)) - orphans
down_nodes = down_nodes.union(new_nodes)
orphans - orphans.union(new_nodes)
orphans.add(down_node)

# check if any of the upstream nodes can be removed from surface
for up_node in up_nodes:
Expand Down
28 changes: 28 additions & 0 deletions tests/test_scheduler.py
Original file line number Diff line number Diff line change
Expand Up @@ -513,3 +513,31 @@ def test_zero_levels_failure(task_zero_levels):
task_state.failed_count + task_state.orphaned_count
== task_state.total_block_count
), task_state

def test_orphan_double_counting():
def process_block(block):
pass

task = Task(
task_id="test_orphans",
total_roi=Roi((0, 0), (25, 25)),
read_roi=Roi((0, 0), (7, 7)),
write_roi=Roi((3, 3), (1, 1)),
process_function=process_block,
check_function=None,
read_write_conflict=True,
)
scheduler = Scheduler([task])

while True:
block = scheduler.acquire_block(task.task_id)
if block is None:
break
block.status = BlockStatus.FAILED
scheduler.release_block(block)

task_state = scheduler.task_states[task.task_id]
assert (
task_state.failed_count + task_state.orphaned_count
== task_state.total_block_count
), task_state

0 comments on commit 74e88ef

Please sign in to comment.