Skip to content

Conversation

@machichima
Copy link
Contributor

Description

Based on the comment here: #58630 (comment)

Current IssueDetector base class requires all its subclasses include the StreamingExecutor as the arguments, making classes hard to mock and test because we have to mock all of StreamingExecutor.

In this PR, we did following:

  1. Remove constructor in IssueDetector base class and add from_executor() to setup the class based on the executor
  2. Refactor subclasses of IssueDetector to use this format

Related issues

Related to #58562

Additional information

Optional: Add implementation details, API changes, usage examples, screenshots, etc.

Signed-off-by: machichima <nary12321@gmail.com>
@machichima machichima requested a review from a team as a code owner November 20, 2025 13:37
Copy link
Contributor

@gemini-code-assist gemini-code-assist bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Code Review

This pull request refactors the IssueDetector base class and its subclasses to improve testability by decoupling them from StreamingExecutor. The core change is the removal of the constructor dependency on StreamingExecutor and the introduction of a from_executor class method factory pattern. HangingExecutionIssueDetector has been fully refactored to take its dependencies directly in its constructor, making it easy to instantiate in tests. Other detectors like HashShuffleAggregatorIssueDetector and HighMemoryIssueDetector have been adapted to the new factory pattern incrementally, which is a reasonable approach. The refactoring in HangingExecutionIssueDetector also simplifies the task processing logic and includes a fix for cleaning up hanging task state by ensuring _hanging_op_tasks is correctly updated when tasks are no longer active. Overall, the changes are well-executed and improve the codebase's design.

@ray-gardener ray-gardener bot added data Ray Data-related issues community-contribution Contributed by the community labels Nov 20, 2025
machichima and others added 3 commits November 21, 2025 08:36
…nIssueDetector

Signed-off-by: machichima <nary12321@gmail.com>
Signed-off-by: machichima <nary12321@gmail.com>
@bveeramani
Copy link
Member

Hey @machichima, do you want a review on this?

Signed-off-by: machichima <nary12321@gmail.com>
@machichima
Copy link
Contributor Author

machichima commented Nov 21, 2025

Hey @machichima, do you want a review on this?

Yes! Thank you so much! Just fix the lint error. This PR is for point 1 and 2 mentioned in #58630 (comment)

ctx = executor._data_context
return cls(
dataset_id=executor._dataset_id,
operators=lambda: list(executor._topology.keys())
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What happens if we pass the list of operators directly here (Rather than a callable)?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Agree, I think we should either pass a list directly or rename this parameter name? From the name I'm not sure if it will be obvious that this is a callable, maybe something like get_operators_fn?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I pass the lambda function instead of list due to this issue: #58852 (comment)
I will rename this args to make it more clear!

Copy link
Contributor Author

@machichima machichima Nov 22, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Rename it to get_operators_fn in e0564fe

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think Cursor is correct here. The topology can't change at runtime.

Since it's simpler and also safe to just pass in a list of operators, I think we should do that instead


executor = StreamingExecutor(ctx)
detector = HangingExecutionIssueDetector(executor, ctx)
detector = HangingExecutionIssueDetector.from_executor(executor)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If we directly test using the constructor, we can simplify this test by removing lines 50 and 48.

Suggested change
detector = HangingExecutionIssueDetector.from_executor(executor)
detector = HangingExecutionIssueDetector(dataset_id="id", ops=[], config=config)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fixed in b63591f


return issues

def detect(self) -> List[Issue]:
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@omatthew98 could you help me verify the correctness of this diff?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah it lgtm.

Copy link
Contributor

@omatthew98 omatthew98 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Overall looks good, I think we should make a decision if we should allow references to the streaming executor in the detectors (seems like no), and we should align on that across all the detectors. If it doesn't make sense to have the reference in the hanging detector, I am not sure why it would for the other detectors.

def __init__(self, executor: "StreamingExecutor", ctx: "DataContext"):
self._executor = executor
self._ctx = ctx
@classmethod
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let's include the implementation from hash_shuffle_detector.py / high_memory_detector.py here?

Although if we are removing the streaming executor reference from HangingExecutionIssueDetector, should we just remove it from all of the implementations of IssueDetector?

Copy link
Contributor Author

@machichima machichima Nov 22, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes! I removed the streaming executor ref from other sub-classes of IssueDetector:

  • Remove from HighMemoryIssueDetector : 89242fd
  • Remove from HashShuffleAggregatorIssueDetector: 6ac3247

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could you please elaborate more on what you mean include the implementation from hash_shuffle_detector.py / high_memory_detector.py?

ctx = executor._data_context
return cls(
dataset_id=executor._dataset_id,
operators=lambda: list(executor._topology.keys())
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Agree, I think we should either pass a list directly or rename this parameter name? From the name I'm not sure if it will be obvious that this is a callable, maybe something like get_operators_fn?

op_task_stats_map[operator.id] = op_metrics._op_task_duration_stats
self._op_id_to_name[operator.id] = operator.name
if op_state._finished:
if operator.execution_finished():
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not sure if there are minor differences between op_state._finished and operator.execution_finished(), but probably better to use the function here.

for task_idx, state_value in op_state_values.items():
curr_time = time.perf_counter() - state_value.start_time_hanging
if op_task_stats.count() > self._op_task_stats_min_count:
if op_task_stats.count() >= self._op_task_stats_min_count:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

OOC was there a reason to add equality here? Just because semantically it makes sense?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes I think it's more intuitive using >=. As when we see min_count it's more likely for us to view it as count >= min_count.


return issues

def detect(self) -> List[Issue]:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah it lgtm.

Signed-off-by: machichima <nary12321@gmail.com>
@machichima machichima force-pushed the 58562-remove-issue-detector-constructor branch from 6280de0 to e0564fe Compare November 22, 2025 02:24
Signed-off-by: machichima <nary12321@gmail.com>
Signed-off-by: machichima <nary12321@gmail.com>
…ueDetector

Signed-off-by: machichima <nary12321@gmail.com>
Signed-off-by: machichima <nary12321@gmail.com>
@machichima machichima force-pushed the 58562-remove-issue-detector-constructor branch from 42f6731 to c6bf4f6 Compare November 22, 2025 03:44
Comment on lines 24 to 27
class HashShuffleAggregatorIssueDetectorConfig:
"""Configuration for HashShuffleAggregatorIssueDetector."""
detection_time_interval_s: float = 30.0
min_wait_time_s: float = 300.0
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Add new config dataclass so that we do not need to pass ctx in constructor.
This config class is passed to python/ray/data/_internal/issue_detection/issue_detector_configuration.py, and we pass those values from ctx to this config in python/ray/data/context.py

@machichima
Copy link
Contributor Author

machichima commented Nov 22, 2025

Hi @bveeramani @omatthew98 ,
I fixed the review comments. PTAL
Thank you!

Copy link
Member

@bveeramani bveeramani left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Overall LGTM!

Could you update the implementation to pass in a list of operators directly (Cursor's review was wrong), and then I'll approve the PR?

Comment on lines 93 to 98
# Track if new operators are added after initialization
if op not in self._initial_memory_requests:
self._initial_memory_requests[op] = (
op._get_dynamic_ray_remote_args().get("memory") or 0
)

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If we pass in the list of operators directly, I don't think we need to do this

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes! Let me remove it!

@bveeramani bveeramani self-assigned this Nov 25, 2025
)
self.issue_detectors_config.hash_shuffle_detector_config.min_wait_time_s = (
self.min_hash_shuffle_aggregator_wait_time_in_s
)
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Bug: Config field changes after initialization are silently ignored

The sync of hash_shuffle_aggregator_health_warning_interval_s and min_hash_shuffle_aggregator_wait_time_in_s to hash_shuffle_detector_config only happens once during __post_init__. Previously, the detector read these values dynamically from self._ctx on each call to detection_time_interval_s() and _should_emit_warning(). Now, if a user modifies these fields after DataContext creation (e.g., ctx.hash_shuffle_aggregator_health_warning_interval_s = 60), the changes are silently ignored and the detector continues using the values captured at initialization. This is a behavioral regression for users who configure these fields after getting the current context.

Fix in Cursor Fix in Web

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this is fine.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oh thanks! I was just thinking about this.

Signed-off-by: machichima <nary12321@gmail.com>
Comment on lines 29 to 31
if TYPE_CHECKING:
pass

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Remove?

@bveeramani
Copy link
Member

ty for the contribution! Just lemme know when Ci is passing and I'll merge the PR

Signed-off-by: machichima <nary12321@gmail.com>
@bveeramani bveeramani enabled auto-merge (squash) November 25, 2025 19:03
@github-actions github-actions bot added the go add ONLY when ready to merge, run all tests label Nov 25, 2025
@bveeramani bveeramani merged commit dcf0923 into ray-project:master Nov 25, 2025
7 of 8 checks passed
@bveeramani
Copy link
Member

@machichima PR has been merged!

  1. Rewrite the hanging-detection test to directly test a HangingIssueDetector instance.
    Once the constructor is simplified, tests can instantiate HangingIssueDetector directly with minimal, focused inputs. This avoids the brittle/hacky-ness of trying to mock internal implementation details.

Now that we've refactored the constructor, would you be down to open a follow-up PR and rewrite the test?

@bveeramani bveeramani changed the title [Data] remove issue detector constructor [Data] Remove constructor from IssueDetector base class Nov 26, 2025
SheldonTsen pushed a commit to SheldonTsen/ray that referenced this pull request Dec 1, 2025
## Description

Based on the comment here:
ray-project#58630 (comment)

Current `IssueDetector` base class requires all its subclasses include
the `StreamingExecutor` as the arguments, making classes hard to mock
and test because we have to mock all of StreamingExecutor.

In this PR, we did following:
1. Remove constructor in `IssueDetector` base class and add
`from_executor()` to setup the class based on the executor
2. Refactor subclasses of `IssueDetector` to use this format

## Related issues

Related to ray-project#58562

## Additional information
> Optional: Add implementation details, API changes, usage examples,
screenshots, etc.

---------

Signed-off-by: machichima <nary12321@gmail.com>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

community-contribution Contributed by the community data Ray Data-related issues go add ONLY when ready to merge, run all tests

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants