Skip to content

Commit a23d685

Browse files
simeetnayan81bveeramani
authored andcommitted
[Data] Refactor PhysicalOperator.completed to fix side effects (ray-project#58915)
# Description This PR refactors the `PhysicalOperator` class to eliminate hidden side effects in the `completed()` method. Previously, calling `completed()` could inadvertently modify the internal state of the operator, which could lead to unexpected behavior. This change separates the logic for checking if the operator is marked as finished from the logic that computes whether it is actually finished. Key changes include: - Renaming `_execution_finished` to `_is_execution_marked_finished` to clarify its purpose. - Renaming `execution_finished()` to `has_execution_finished()` and making it a pure computed property without side effects. - Updating the `completed()` method to call `has_execution_finished()` instead of modifying internal state. - Ensuring that `mark_execution_finished()` correctly sets the renamed field. ## Related issues Fixes ray-project#58884 ## Additional information This refactor ensures that both `has_execution_finished()` and `completed()` are pure query methods, allowing them to be called multiple times without altering the state of the operator. T --------- Signed-off-by: Simeet Nayan <simeetnayan.8100@gmail.com> Co-authored-by: Balaji Veeramani <balaji@anyscale.com>
1 parent 23277f1 commit a23d685

File tree

7 files changed

+34
-31
lines changed

7 files changed

+34
-31
lines changed

python/ray/data/_internal/execution/interfaces/physical_operator.py

Lines changed: 23 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -343,7 +343,7 @@ def __init__(
343343
self._in_task_output_backpressure = False
344344
self._estimated_num_output_bundles = None
345345
self._estimated_output_num_rows = None
346-
self._execution_finished = False
346+
self._is_execution_marked_finished = False
347347
# The LogicalOperator(s) which were translated to create this PhysicalOperator.
348348
# Set via `PhysicalOperator.set_logical_operators()`.
349349
self._logical_operators: List[LogicalOperator] = []
@@ -401,48 +401,51 @@ def override_target_max_block_size(self, target_max_block_size: Optional[int]):
401401

402402
def mark_execution_finished(self):
403403
"""Manually mark that this operator has finished execution."""
404-
self._execution_finished = True
404+
self._is_execution_marked_finished = True
405405

406-
def execution_finished(self) -> bool:
406+
def has_execution_finished(self) -> bool:
407407
"""Return True when this operator has finished execution.
408408
409409
The outputs may or may not have been taken.
410410
"""
411-
return self._execution_finished
411+
from ..operators.base_physical_operator import InternalQueueOperatorMixin
412+
413+
internal_input_queue_num_blocks = 0
414+
if isinstance(self, InternalQueueOperatorMixin):
415+
internal_input_queue_num_blocks = self.internal_input_queue_num_blocks()
416+
417+
# NOTE: Execution is considered finished if
418+
# - The operator was explicitly marked finished OR
419+
# - The following auto-completion conditions are met
420+
# - All input blocks have been ingested
421+
# - Internal queue is empty
422+
# - There are no active or pending tasks
423+
424+
return self._is_execution_marked_finished or (
425+
self._inputs_complete
426+
and self.num_active_tasks() == 0
427+
and internal_input_queue_num_blocks == 0
428+
)
412429

413430
def completed(self) -> bool:
414431
"""Returns whether this operator has been fully completed.
415432
416433
An operator is completed iff:
417-
* The operator has finished execution (i.e., `execution_finished()` is True).
434+
* The operator has finished execution (i.e., `has_execution_finished()` is True).
418435
* All outputs have been taken (i.e., `has_next()` is False) from it.
419436
"""
420437
from ..operators.base_physical_operator import InternalQueueOperatorMixin
421438

422-
internal_input_queue_num_blocks = 0
423439
internal_output_queue_num_blocks = 0
424440
if isinstance(self, InternalQueueOperatorMixin):
425-
internal_input_queue_num_blocks = self.internal_input_queue_num_blocks()
426441
internal_output_queue_num_blocks = self.internal_output_queue_num_blocks()
427442

428-
if not self._execution_finished:
429-
if (
430-
self._inputs_complete
431-
and internal_input_queue_num_blocks == 0
432-
and self.num_active_tasks() == 0
433-
):
434-
# NOTE: Operator is considered completed iff
435-
# - All input blocks have been ingested
436-
# - Internal queue is empty
437-
# - There are no active or pending tasks
438-
self._execution_finished = True
439-
440443
# NOTE: We check for (internal_output_queue_size == 0) and
441444
# (not self.has_next()) because _OrderedOutputQueue can
442445
# return False for self.has_next(), but have a non-empty queue size.
443446
# Draining the internal output queue is important to free object refs.
444447
return (
445-
self._execution_finished
448+
self.has_execution_finished()
446449
and not self.has_next()
447450
and internal_output_queue_num_blocks == 0
448451
)

python/ray/data/_internal/execution/operators/limit_operator.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -117,7 +117,7 @@ def get_stats(self) -> StatsDict:
117117
def num_outputs_total(self) -> Optional[int]:
118118
# Before execution is completed, we don't know how many output
119119
# bundles we will have. We estimate based off the consumption so far.
120-
if self._execution_finished:
120+
if self.has_execution_finished():
121121
return self._cur_output_bundles
122122
return self._estimated_num_output_bundles
123123

python/ray/data/_internal/execution/resource_manager.py

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -341,7 +341,7 @@ def is_op_eligible(self, op: PhysicalOperator) -> bool:
341341
not op.throttling_disabled()
342342
# As long as the op has finished execution, even if there are still
343343
# non-taken outputs, we don't need to allocate resources for it.
344-
and not op.execution_finished()
344+
and not op.has_execution_finished()
345345
)
346346

347347
def get_eligible_ops(self) -> List[PhysicalOperator]:
@@ -553,7 +553,7 @@ def _is_op_eligible(op: PhysicalOperator) -> bool:
553553
not op.throttling_disabled()
554554
# As long as the op has finished execution, even if there are still
555555
# non-taken outputs, we don't need to allocate resources for it.
556-
and not op.execution_finished()
556+
and not op.has_execution_finished()
557557
)
558558

559559
def _get_downstream_eligible_ops(
@@ -674,9 +674,9 @@ def _get_ineligible_ops_with_usage(self) -> List[PhysicalOperator]:
674674
ops_to_exclude_from_reservation = []
675675
# Traverse operator tree collecting all operators that have already finished
676676
for op in self._topology:
677-
if not op.execution_finished():
677+
if not op.has_execution_finished():
678678
for dep in op.input_dependencies:
679-
if dep.execution_finished():
679+
if dep.has_execution_finished():
680680
last_completed_ops.append(dep)
681681

682682
# In addition to completed operators,

python/ray/data/_internal/execution/streaming_executor.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -652,7 +652,7 @@ def _get_state_dict(self, state):
652652
"total_rows": op.num_output_rows_total(),
653653
"queued_blocks": op_state.total_enqueued_input_blocks(),
654654
"state": DatasetState.FINISHED.name
655-
if op.execution_finished()
655+
if op.has_execution_finished()
656656
else state,
657657
}
658658
for i, (op, op_state) in enumerate(self._topology.items())

python/ray/data/_internal/execution/streaming_executor_state.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -672,7 +672,7 @@ def update_operator_states(topology: Topology) -> None:
672672
# Drain external input queue if current operator is execution finished.
673673
# This is needed when the limit is reached, and `mark_execution_finished`
674674
# is called manually.
675-
if op.execution_finished():
675+
if op.has_execution_finished():
676676
for input_queue in op_state.input_queues:
677677
# Drain input queue
678678
input_queue.clear()

python/ray/data/tests/test_operators.py

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1055,7 +1055,7 @@ def test_limit_operator(ray_start_regular_shared):
10551055
while input_op.has_next() and not limit_op._limit_reached():
10561056
loop_count += 1
10571057
assert not limit_op.completed(), limit
1058-
assert not limit_op._execution_finished, limit
1058+
assert not limit_op.has_execution_finished(), limit
10591059
limit_op.add_input(input_op.get_next(), 0)
10601060
while limit_op.has_next():
10611061
# Drain the outputs. So the limit operator
@@ -1066,12 +1066,12 @@ def test_limit_operator(ray_start_regular_shared):
10661066
assert limit_op.mark_execution_finished.call_count == 1, limit
10671067
assert limit_op.completed(), limit
10681068
assert limit_op._limit_reached(), limit
1069-
assert limit_op._execution_finished, limit
1069+
assert limit_op.has_execution_finished(), limit
10701070
else:
10711071
assert limit_op.mark_execution_finished.call_count == 0, limit
10721072
assert not limit_op.completed(), limit
10731073
assert not limit_op._limit_reached(), limit
1074-
assert not limit_op._execution_finished, limit
1074+
assert not limit_op.has_execution_finished(), limit
10751075
limit_op.mark_execution_finished()
10761076
# After inputs done, the number of output bundles
10771077
# should be the same as the number of `add_input`s.

python/ray/data/tests/test_streaming_executor.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -258,7 +258,7 @@ def test_update_operator_states_drains_upstream(ray_start_regular_shared):
258258

259259
# Manually mark o2 as execution finished (simulating limit operator behavior)
260260
o2.mark_execution_finished()
261-
assert o2.execution_finished(), "o2 should be execution finished"
261+
assert o2.has_execution_finished(), "o2 should be execution finished"
262262

263263
# Call update_operator_states - this should drain o1's output queue
264264
update_operator_states(topo)

0 commit comments

Comments
 (0)