Skip to content

Commit bbfae94

Browse files
Merge branch 'master' into rushikesh/remove-read-parquet-bulk-api
2 parents 031d2e1 + 683a29d commit bbfae94

File tree

10 files changed

+47
-44
lines changed

10 files changed

+47
-44
lines changed

doc/source/rllib/rllib-algorithms.rst

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -173,9 +173,9 @@ Asynchronous Proximal Policy Optimization (APPO)
173173

174174
.. tip::
175175

176-
APPO was originally `published under the name "IMPACT" <https://arxiv.org/abs/1707.06347>`__. RLlib's APPO exactly matches the algorithm described in the paper.
176+
APPO was originally `published under the name "IMPACT" <https://arxiv.org/abs/1912.00167>`__. RLlib's APPO exactly matches the algorithm described in the paper.
177177

178-
`[paper] <https://arxiv.org/abs/1707.06347>`__
178+
`[paper] <https://arxiv.org/abs/1912.00167>`__
179179
`[implementation] <https://github.com/ray-project/ray/blob/master/rllib/algorithms/appo/appo.py>`__
180180

181181
.. figure:: images/algos/appo-architecture.svg

python/ray/data/_internal/execution/interfaces/physical_operator.py

Lines changed: 23 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -343,7 +343,7 @@ def __init__(
343343
self._in_task_output_backpressure = False
344344
self._estimated_num_output_bundles = None
345345
self._estimated_output_num_rows = None
346-
self._execution_finished = False
346+
self._is_execution_marked_finished = False
347347
# The LogicalOperator(s) which were translated to create this PhysicalOperator.
348348
# Set via `PhysicalOperator.set_logical_operators()`.
349349
self._logical_operators: List[LogicalOperator] = []
@@ -401,48 +401,51 @@ def override_target_max_block_size(self, target_max_block_size: Optional[int]):
401401

402402
def mark_execution_finished(self):
403403
"""Manually mark that this operator has finished execution."""
404-
self._execution_finished = True
404+
self._is_execution_marked_finished = True
405405

406-
def execution_finished(self) -> bool:
406+
def has_execution_finished(self) -> bool:
407407
"""Return True when this operator has finished execution.
408408
409409
The outputs may or may not have been taken.
410410
"""
411-
return self._execution_finished
411+
from ..operators.base_physical_operator import InternalQueueOperatorMixin
412+
413+
internal_input_queue_num_blocks = 0
414+
if isinstance(self, InternalQueueOperatorMixin):
415+
internal_input_queue_num_blocks = self.internal_input_queue_num_blocks()
416+
417+
# NOTE: Execution is considered finished if
418+
# - The operator was explicitly marked finished OR
419+
# - The following auto-completion conditions are met
420+
# - All input blocks have been ingested
421+
# - Internal queue is empty
422+
# - There are no active or pending tasks
423+
424+
return self._is_execution_marked_finished or (
425+
self._inputs_complete
426+
and self.num_active_tasks() == 0
427+
and internal_input_queue_num_blocks == 0
428+
)
412429

413430
def completed(self) -> bool:
414431
"""Returns whether this operator has been fully completed.
415432
416433
An operator is completed iff:
417-
* The operator has finished execution (i.e., `execution_finished()` is True).
434+
* The operator has finished execution (i.e., `has_execution_finished()` is True).
418435
* All outputs have been taken (i.e., `has_next()` is False) from it.
419436
"""
420437
from ..operators.base_physical_operator import InternalQueueOperatorMixin
421438

422-
internal_input_queue_num_blocks = 0
423439
internal_output_queue_num_blocks = 0
424440
if isinstance(self, InternalQueueOperatorMixin):
425-
internal_input_queue_num_blocks = self.internal_input_queue_num_blocks()
426441
internal_output_queue_num_blocks = self.internal_output_queue_num_blocks()
427442

428-
if not self._execution_finished:
429-
if (
430-
self._inputs_complete
431-
and internal_input_queue_num_blocks == 0
432-
and self.num_active_tasks() == 0
433-
):
434-
# NOTE: Operator is considered completed iff
435-
# - All input blocks have been ingested
436-
# - Internal queue is empty
437-
# - There are no active or pending tasks
438-
self._execution_finished = True
439-
440443
# NOTE: We check for (internal_output_queue_size == 0) and
441444
# (not self.has_next()) because _OrderedOutputQueue can
442445
# return False for self.has_next(), but have a non-empty queue size.
443446
# Draining the internal output queue is important to free object refs.
444447
return (
445-
self._execution_finished
448+
self.has_execution_finished()
446449
and not self.has_next()
447450
and internal_output_queue_num_blocks == 0
448451
)

python/ray/data/_internal/execution/operators/limit_operator.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -117,7 +117,7 @@ def get_stats(self) -> StatsDict:
117117
def num_outputs_total(self) -> Optional[int]:
118118
# Before execution is completed, we don't know how many output
119119
# bundles we will have. We estimate based off the consumption so far.
120-
if self._execution_finished:
120+
if self.has_execution_finished():
121121
return self._cur_output_bundles
122122
return self._estimated_num_output_bundles
123123

python/ray/data/_internal/execution/resource_manager.py

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -341,7 +341,7 @@ def is_op_eligible(self, op: PhysicalOperator) -> bool:
341341
not op.throttling_disabled()
342342
# As long as the op has finished execution, even if there are still
343343
# non-taken outputs, we don't need to allocate resources for it.
344-
and not op.execution_finished()
344+
and not op.has_execution_finished()
345345
)
346346

347347
def get_eligible_ops(self) -> List[PhysicalOperator]:
@@ -553,7 +553,7 @@ def _is_op_eligible(op: PhysicalOperator) -> bool:
553553
not op.throttling_disabled()
554554
# As long as the op has finished execution, even if there are still
555555
# non-taken outputs, we don't need to allocate resources for it.
556-
and not op.execution_finished()
556+
and not op.has_execution_finished()
557557
)
558558

559559
def _get_downstream_eligible_ops(
@@ -674,9 +674,9 @@ def _get_ineligible_ops_with_usage(self) -> List[PhysicalOperator]:
674674
ops_to_exclude_from_reservation = []
675675
# Traverse operator tree collecting all operators that have already finished
676676
for op in self._topology:
677-
if not op.execution_finished():
677+
if not op.has_execution_finished():
678678
for dep in op.input_dependencies:
679-
if dep.execution_finished():
679+
if dep.has_execution_finished():
680680
last_completed_ops.append(dep)
681681

682682
# In addition to completed operators,

python/ray/data/_internal/execution/streaming_executor.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -652,7 +652,7 @@ def _get_state_dict(self, state):
652652
"total_rows": op.num_output_rows_total(),
653653
"queued_blocks": op_state.total_enqueued_input_blocks(),
654654
"state": DatasetState.FINISHED.name
655-
if op.execution_finished()
655+
if op.has_execution_finished()
656656
else state,
657657
}
658658
for i, (op, op_state) in enumerate(self._topology.items())

python/ray/data/_internal/execution/streaming_executor_state.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -672,7 +672,7 @@ def update_operator_states(topology: Topology) -> None:
672672
# Drain external input queue if current operator is execution finished.
673673
# This is needed when the limit is reached, and `mark_execution_finished`
674674
# is called manually.
675-
if op.execution_finished():
675+
if op.has_execution_finished():
676676
for input_queue in op_state.input_queues:
677677
# Drain input queue
678678
input_queue.clear()

python/ray/data/tests/test_operators.py

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1055,7 +1055,7 @@ def test_limit_operator(ray_start_regular_shared):
10551055
while input_op.has_next() and not limit_op._limit_reached():
10561056
loop_count += 1
10571057
assert not limit_op.completed(), limit
1058-
assert not limit_op._execution_finished, limit
1058+
assert not limit_op.has_execution_finished(), limit
10591059
limit_op.add_input(input_op.get_next(), 0)
10601060
while limit_op.has_next():
10611061
# Drain the outputs. So the limit operator
@@ -1066,12 +1066,12 @@ def test_limit_operator(ray_start_regular_shared):
10661066
assert limit_op.mark_execution_finished.call_count == 1, limit
10671067
assert limit_op.completed(), limit
10681068
assert limit_op._limit_reached(), limit
1069-
assert limit_op._execution_finished, limit
1069+
assert limit_op.has_execution_finished(), limit
10701070
else:
10711071
assert limit_op.mark_execution_finished.call_count == 0, limit
10721072
assert not limit_op.completed(), limit
10731073
assert not limit_op._limit_reached(), limit
1074-
assert not limit_op._execution_finished, limit
1074+
assert not limit_op.has_execution_finished(), limit
10751075
limit_op.mark_execution_finished()
10761076
# After inputs done, the number of output bundles
10771077
# should be the same as the number of `add_input`s.

python/ray/data/tests/test_streaming_executor.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -258,7 +258,7 @@ def test_update_operator_states_drains_upstream(ray_start_regular_shared):
258258

259259
# Manually mark o2 as execution finished (simulating limit operator behavior)
260260
o2.mark_execution_finished()
261-
assert o2.execution_finished(), "o2 should be execution finished"
261+
assert o2.has_execution_finished(), "o2 should be execution finished"
262262

263263
# Call update_operator_states - this should drain o1's output queue
264264
update_operator_states(topo)

rllib/env/env_runner.py

Lines changed: 11 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -258,12 +258,10 @@ def _try_env_step(self, actions):
258258
except Exception as e:
259259
self.metrics.log_value(NUM_ENV_STEP_FAILURES_LIFETIME, 1, reduce="sum")
260260

261-
# @OldAPIStack (config.restart_failed_sub_environments)
262261
if self.config.restart_failed_sub_environments:
263262
if not isinstance(e, StepFailedRecreateEnvError):
264263
logger.exception(
265-
"Stepping the env resulted in an error! The original error "
266-
f"is: {e}"
264+
f"RLlib {self.__class__.__name__}: Environment step failed. Will force reset env(s) in this EnvRunner. The original error is: {e}"
267265
)
268266
# Recreate the env.
269267
self.make_env()
@@ -272,11 +270,16 @@ def _try_env_step(self, actions):
272270
# data and repeating the step attempt).
273271
return ENV_STEP_FAILURE
274272
else:
275-
if isinstance(e, StepFailedRecreateEnvError):
276-
raise ValueError(
277-
"Environment raised StepFailedRecreateEnvError but config.restart_failed_sub_environments is False."
278-
) from e
279-
raise e
273+
logger.exception(
274+
f"RLlib {self.__class__.__name__}: Environment step failed and "
275+
"'config.restart_failed_sub_environments' is False. "
276+
"This env will not be recreated. "
277+
"Consider setting 'fault_tolerance(restart_failed_sub_environments=True)' in your AlgorithmConfig "
278+
"in order to automatically re-create and force-reset an env."
279+
f"The original error type: {type(e)}. "
280+
f"{e}"
281+
)
282+
raise RuntimeError from e
280283

281284
def _convert_to_tensor(self, struct) -> TensorType:
282285
"""Converts structs to a framework-specific tensor."""

rllib/env/multi_agent_env_runner.py

Lines changed: 0 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -363,9 +363,6 @@ def _sample(
363363
# Try stepping the environment.
364364
results = self._try_env_step(actions_for_env)
365365
if results == ENV_STEP_FAILURE:
366-
logging.warning(
367-
f"RLlib {self.__class__.__name__}: Environment step failed. Will force reset env(s) in this EnvRunner."
368-
)
369366
return self._sample(
370367
num_timesteps=num_timesteps,
371368
num_episodes=num_episodes,

0 commit comments

Comments
 (0)