Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -143,7 +143,7 @@ def detect(self) -> List[Issue]:
op_task_stats = op_task_stats_map[op_id]
for task_idx, state_value in op_state_values.items():
curr_time = time.perf_counter() - state_value.start_time_hanging
if op_task_stats.count() > self._op_task_stats_min_count:
if op_task_stats.count() >= self._op_task_stats_min_count:
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think using >= here is more intuitive? As when we see min_count it's more likely for us to view it as count >= min_count

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, good catch!

mean = op_task_stats.mean()
stddev = op_task_stats.stddev()
threshold = mean + self._op_task_stats_std_factor_threshold * stddev
Expand Down
63 changes: 38 additions & 25 deletions python/ray/data/tests/test_issue_detection.py
Original file line number Diff line number Diff line change
Expand Up @@ -104,36 +104,49 @@ def test_hanging_detector_detects_issues(
):
"""Test hanging detector adaptive thresholds with real Ray Data pipelines and extreme configurations."""

ctx = DataContext.get_current()
# Configure hanging detector with extreme std_factor values
ctx.issue_detectors_config.hanging_detector_config = (
HangingExecutionIssueDetectorConfig(
op_task_stats_min_count=1,
op_task_stats_std_factor=1,
detection_time_interval_s=0,
)
from ray.data._internal.issue_detection.detectors.hanging_detector import (
HangingExecutionState as RealHangingExecutionState,
)

# Create a pipeline with many small blocks to ensure concurrent tasks
def sleep_task(x):
if x["id"] == 2:
# Issue detection is based on the mean + stdev. One of the tasks must take
# awhile, so doing it just for one of the rows.
time.sleep(1)
return x
def mock_state_constructor(**kwargs):
# set start_time_hanging to 0 for task with task_idx == 1 to make
# time.perf_counter() - state_value.start_time_hanging large
if kwargs.get("task_idx") == 1:
kwargs["start_time_hanging"] = 0.0
# Call the real class with kwargs modified
return RealHangingExecutionState(**kwargs)

with patch(
"ray.data._internal.issue_detection.detectors.hanging_detector.HangingExecutionState"
) as mock_state_cls:
mock_state_cls.side_effect = mock_state_constructor
Comment on lines +111 to +122
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This seems hacky to modify internal logic of start_time_hanging, let's just mock the detector and make it a unit test.

Copy link
Contributor Author

@machichima machichima Nov 17, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For making it into a unit test, should we move it into python/ray/data/tests/unit/?

And for mock the detector, do you mean mock the whole HangingExecutionIssueDetector? I thought we want to test if HangingExecutionIssueDetector.detect works correctly here.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For making it into a unit test, should we move it into python/ray/data/tests/unit/?

I think its fine to leave the test here.

We just want to see the issue is produced when the condition is met, therefore we don't need to really execute the dataset.

This will deflake the test and make it deterministic.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

After second thought I think the better way is we hang certain task instead of modify its start time.

Should do Something like this

        # Create a pipeline with many small blocks to ensure concurrent tasks
        def sleep_task(x):
            if x["id"] == 2:
                time.sleep(1.0)
            return x
...

And remove this part

        def mock_state_constructor(**kwargs):
            # set start_time_hanging to 0 for task with task_idx == 1 to make
            # time.perf_counter() - state_value.start_time_hanging large
            if kwargs.get("task_idx") == 1:
                kwargs["start_time_hanging"] = 0.0
            # Call the real class with kwargs modified
            return RealHangingExecutionState(**kwargs)

        with patch(
            "ray.data._internal.issue_detection.detectors.hanging_detector.HangingExecutionState"
        ) as mock_state_cls:
            mock_state_cls.side_effect = mock_state_constructor


ctx = DataContext.get_current()
# Configure hanging detector with extreme std_factor values
ctx.issue_detectors_config.hanging_detector_config = (
HangingExecutionIssueDetectorConfig(
op_task_stats_min_count=1,
op_task_stats_std_factor=1,
detection_time_interval_s=0,
)
)

with caplog.at_level(logging.WARNING):
ray.data.range(3, override_num_blocks=3).map(
sleep_task, concurrency=1
).materialize()
# Create a pipeline with many small blocks to ensure concurrent tasks
def sleep_task(x):
return x

# Check if hanging detection occurred
hanging_detected = (
"has been running for" in caplog.text
and "longer than the average task duration" in caplog.text
)
with caplog.at_level(logging.WARNING):
ray.data.range(3, override_num_blocks=3).map(
sleep_task, concurrency=1
).materialize()

# Check if hanging detection occurred
hanging_detected = (
"has been running for" in caplog.text
and "longer than the average task duration" in caplog.text
)

assert hanging_detected, caplog.text
assert hanging_detected, caplog.text


@pytest.mark.parametrize(
Expand Down