Skip to content

Conversation

@machichima
Copy link
Contributor

@machichima machichima commented Nov 14, 2025

Description

  1. mock the HangingExecutionState to set start_hanging_time to 0 for one of the calls

Related issues

Closes #58562

Additional information

While this PR is trying to fix the flaky test, I use following script to run test for 20 times and ensure they all passed

#!/bin/bash

pass=0
fail=0

for i in {1..20}; do
  if python -m pytest python/ray/data/tests/test_issue_detection.py::TestHangingExecutionIssueDetector::test_hanging_detector_detects_issues -xvs > /dev/null 2>&1; then
    ((pass++))
  else
    ((fail++))
  fi
done

echo ""
echo "Passed: $pass, Failed: $fail"
image

Signed-off-by: machichima <nary12321@gmail.com>
Signed-off-by: machichima <nary12321@gmail.com>
@machichima machichima requested a review from a team as a code owner November 14, 2025 11:38
Copy link
Contributor

@gemini-code-assist gemini-code-assist bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Code Review

This pull request effectively addresses a flaky test by mocking the hanging task detection logic, which is a great improvement. The extraction of _is_task_hanging is a clean way to enable this mocking. I have two suggestions: one is to remove a leftover debug print statement, and the other is a minor style improvement in the test mock implementation for better clarity.

@machichima
Copy link
Contributor Author

machichima commented Nov 14, 2025

@owenowenisme @bveeramani PTAL. Thank you~

for task_idx, state_value in op_state_values.items():
curr_time = time.perf_counter() - state_value.start_time_hanging
if op_task_stats.count() > self._op_task_stats_min_count:
if op_task_stats.count() >= self._op_task_stats_min_count:
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think using >= here is more intuitive? As when we see min_count it's more likely for us to view it as count >= min_count

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, good catch!

Signed-off-by: machichima <nary12321@gmail.com>
@machichima machichima force-pushed the 58562-hanging-detector-flaky branch from 82ab8fc to af1039d Compare November 14, 2025 12:34
@ray-gardener ray-gardener bot added data Ray Data-related issues community-contribution Contributed by the community labels Nov 14, 2025
Comment on lines +111 to +122
def mock_state_constructor(**kwargs):
# set start_time_hanging to 0 for task with task_idx == 1 to make
# time.perf_counter() - state_value.start_time_hanging large
if kwargs.get("task_idx") == 1:
kwargs["start_time_hanging"] = 0.0
# Call the real class with kwargs modified
return RealHangingExecutionState(**kwargs)

with patch(
"ray.data._internal.issue_detection.detectors.hanging_detector.HangingExecutionState"
) as mock_state_cls:
mock_state_cls.side_effect = mock_state_constructor
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This seems hacky to modify internal logic of start_time_hanging, let's just mock the detector and make it a unit test.

Copy link
Contributor Author

@machichima machichima Nov 17, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For making it into a unit test, should we move it into python/ray/data/tests/unit/?

And for mock the detector, do you mean mock the whole HangingExecutionIssueDetector? I thought we want to test if HangingExecutionIssueDetector.detect works correctly here.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For making it into a unit test, should we move it into python/ray/data/tests/unit/?

I think its fine to leave the test here.

We just want to see the issue is produced when the condition is met, therefore we don't need to really execute the dataset.

This will deflake the test and make it deterministic.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

After second thought I think the better way is we hang certain task instead of modify its start time.

Should do Something like this

        # Create a pipeline with many small blocks to ensure concurrent tasks
        def sleep_task(x):
            if x["id"] == 2:
                time.sleep(1.0)
            return x
...

And remove this part

        def mock_state_constructor(**kwargs):
            # set start_time_hanging to 0 for task with task_idx == 1 to make
            # time.perf_counter() - state_value.start_time_hanging large
            if kwargs.get("task_idx") == 1:
                kwargs["start_time_hanging"] = 0.0
            # Call the real class with kwargs modified
            return RealHangingExecutionState(**kwargs)

        with patch(
            "ray.data._internal.issue_detection.detectors.hanging_detector.HangingExecutionState"
        ) as mock_state_cls:
            mock_state_cls.side_effect = mock_state_constructor

@machichima
Copy link
Contributor Author

Hi @bveeramani
I would like to query about the expectation of this test. I'm happy to discuss if you have time
Thanks!

@bveeramani
Copy link
Member

Hi @bveeramani I would like to query about the expectation of this test. I'm happy to discuss if you have time Thanks!

Hey @machichima, thanks for following up!

I think this test is hard to improve (right now) because the abstractions aren't testable.

I think we should address this by opening three PRs that do the following:

1. Remove the constructor from IssueDetector and introduce a factory method like IssueDetector.for_executor.

Right now, the issue detector base class forces every subclass to use a specific constructor signature that includes the complex StreamingExecutor type. This is problematic because different detectors might not need all of the information in the executor, and it makes tests harder to write because you have to mock all of StreamingExecutor (even if you don't need most of its information!)

2. Trim the dependencies for HangingIssueDetector and update its constructor accordingly.

This keeps the dependency surface small and makes the class easier to reason about and test. Each detector should declare only what it truly needs rather than inheriting a one-size-fits-all set of constructor arguments.

3. Rewrite the hanging-detection test to directly test a HangingIssueDetector instance.

Once the constructor is simplified, tests can instantiate HangingIssueDetector directly with minimal, focused inputs. This avoids the brittle/hacky-ness of trying to mock internal implementation details.

I've sketched out a solution here: #58770 -- what do you think?

@machichima
Copy link
Contributor Author

Hi @bveeramani ,
Thank you for the detailed guideline! It makes sense to me!
I'll give it a go 🙏

bveeramani pushed a commit that referenced this pull request Nov 25, 2025
## Description

Based on the comment here:
#58630 (comment)

Current `IssueDetector` base class requires all its subclasses include
the `StreamingExecutor` as the arguments, making classes hard to mock
and test because we have to mock all of StreamingExecutor.

In this PR, we did following:
1. Remove constructor in `IssueDetector` base class and add
`from_executor()` to setup the class based on the executor
2. Refactor subclasses of `IssueDetector` to use this format

## Related issues

Related to #58562

## Additional information
> Optional: Add implementation details, API changes, usage examples,
screenshots, etc.

---------

Signed-off-by: machichima <nary12321@gmail.com>
SheldonTsen pushed a commit to SheldonTsen/ray that referenced this pull request Dec 1, 2025
## Description

Based on the comment here:
ray-project#58630 (comment)

Current `IssueDetector` base class requires all its subclasses include
the `StreamingExecutor` as the arguments, making classes hard to mock
and test because we have to mock all of StreamingExecutor.

In this PR, we did following:
1. Remove constructor in `IssueDetector` base class and add
`from_executor()` to setup the class based on the executor
2. Refactor subclasses of `IssueDetector` to use this format

## Related issues

Related to ray-project#58562

## Additional information
> Optional: Add implementation details, API changes, usage examples,
screenshots, etc.

---------

Signed-off-by: machichima <nary12321@gmail.com>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

community-contribution Contributed by the community data Ray Data-related issues

Projects

None yet

Development

Successfully merging this pull request may close these issues.

[Data][Flaky] test_hanging_detector_detects_issues intermittently fails to detect hanging

3 participants