-
Notifications
You must be signed in to change notification settings - Fork 7k
[data] Adding in test for issue detection. #58292
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[data] Adding in test for issue detection. #58292
Conversation
Signed-off-by: Matthew Owen <mowen@anyscale.com>
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Code Review
This pull request adds a test file for issue detection that was missed during a previous upstreaming. The new tests cover hanging execution and high memory usage scenarios. I've provided a few suggestions to improve the test code's robustness and readability, such as ensuring logger cleanup, removing unused fixtures, and replacing magic numbers with constants.
| # Set up logging capture | ||
| log_capture = io.StringIO() | ||
| handler = logging.StreamHandler(log_capture) | ||
| logger = logging.getLogger("ray.data._internal.issue_detection") | ||
| logger.addHandler(handler) | ||
|
|
||
| # Set up mock stats to return values that will trigger adaptive threshold | ||
| mocked_mean = 2.0 # Increase from 0.5 to 2.0 seconds | ||
| mocked_stddev = 0.2 # Increase from 0.05 to 0.2 seconds | ||
| mock_stats = mock_stats_cls.return_value | ||
| mock_stats.count.return_value = 20 # Enough samples | ||
| mock_stats.mean.return_value = mocked_mean | ||
| mock_stats.stddev.return_value = mocked_stddev | ||
|
|
||
| # Set a short issue detection interval for testing | ||
| ctx = DataContext.get_current() | ||
| detector_cfg = ctx.issue_detectors_config.hanging_detector_config | ||
| detector_cfg.detection_time_interval_s = 0.00 | ||
|
|
||
| # test no hanging doesn't log hanging warning | ||
| def f1(x): | ||
| return x | ||
|
|
||
| _ = ray.data.range(1).map(f1).materialize() | ||
|
|
||
| log_output = log_capture.getvalue() | ||
| warn_msg = ( | ||
| r"A task of operator .+ with task index .+ has been running for [\d\.]+s" | ||
| ) | ||
| assert re.search(warn_msg, log_output) is None, log_output | ||
|
|
||
| # # test hanging does log hanging warning | ||
| def f2(x): | ||
| time.sleep(5.0) # Increase from 1.1 to 5.0 seconds to exceed new threshold | ||
| return x | ||
|
|
||
| _ = ray.data.range(1).map(f2).materialize() | ||
|
|
||
| log_output = log_capture.getvalue() | ||
| assert re.search(warn_msg, log_output) is not None, log_output |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The logging.StreamHandler added to the logger is not removed after the test, which can leak the handler and cause side effects in subsequent tests (e.g., duplicate log messages). It's best practice to ensure cleanup. Using a try...finally block guarantees that logger.removeHandler(handler) is always called, making the test more robust.
# Set up logging capture
log_capture = io.StringIO()
handler = logging.StreamHandler(log_capture)
logger = logging.getLogger("ray.data._internal.issue_detection")
logger.addHandler(handler)
try:
# Set up mock stats to return values that will trigger adaptive threshold
mocked_mean = 2.0 # Increase from 0.5 to 2.0 seconds
mocked_stddev = 0.2 # Increase from 0.05 to 0.2 seconds
mock_stats = mock_stats_cls.return_value
mock_stats.count.return_value = 20 # Enough samples
mock_stats.mean.return_value = mocked_mean
mock_stats.stddev.return_value = mocked_stddev
# Set a short issue detection interval for testing
ctx = DataContext.get_current()
detector_cfg = ctx.issue_detectors_config.hanging_detector_config
detector_cfg.detection_time_interval_s = 0.00
# test no hanging doesn't log hanging warning
def f1(x):
return x
_ = ray.data.range(1).map(f1).materialize()
log_output = log_capture.getvalue()
warn_msg = (
r"A task of operator .+ with task index .+ has been running for [\d\.]+s"
)
assert re.search(warn_msg, log_output) is None, log_output
# # test hanging does log hanging warning
def f2(x):
time.sleep(5.0) # Increase from 1.1 to 5.0 seconds to exceed new threshold
return x
_ = ray.data.range(1).map(f2).materialize()
log_output = log_capture.getvalue()
assert re.search(warn_msg, log_output) is not None, log_output
finally:
logger.removeHandler(handler)| def test_realistic_hanging_detection( | ||
| self, | ||
| ray_start_10_cpus, | ||
| caplog, | ||
| propagate_logs, | ||
| restore_data_context, | ||
| ): |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
| @pytest.mark.parametrize( | ||
| "configured_memory, actual_memory, should_return_issue", | ||
| [ | ||
| # User has appropriately configured memory, so no issue. | ||
| (4 * 1024**3, 4 * 1024**3, False), | ||
| # User hasn't configured memory correctly and memory use is high, so issue. | ||
| (None, 4 * 1024**3, True), | ||
| (1, 4 * 1024**3, True), | ||
| # User hasn't configured memory correctly but memory use is low, so no issue. | ||
| (None, 4 * 1024**3 - 1, False), | ||
| ], | ||
| ) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The magic number 4 * 1024**3 is used multiple times in the test parametrization. This value corresponds to _MEMORY_PER_CORE_ESTIMATE in HighMemoryIssueDetector. To improve readability and maintainability, it's better to define a constant for this value and use it in the parametrize decorator.
_MEMORY_PER_CORE_ESTIMATE = 4 * 1024**3
@pytest.mark.parametrize(
"configured_memory, actual_memory, should_return_issue",
[
# User has appropriately configured memory, so no issue.
(_MEMORY_PER_CORE_ESTIMATE, _MEMORY_PER_CORE_ESTIMATE, False),
# User hasn't configured memory correctly and memory use is high, so issue.
(None, _MEMORY_PER_CORE_ESTIMATE, True),
(1, _MEMORY_PER_CORE_ESTIMATE, True),
# User hasn't configured memory correctly but memory use is low, so no issue.
(None, _MEMORY_PER_CORE_ESTIMATE - 1, False),
],
)Signed-off-by: Matthew Owen <mowen@anyscale.com>
## Description Adding missing test for issue detection ## Related issues > Link related issues: "Fixes ray-project#1234", "Closes ray-project#1234", or "Related to ray-project#1234". ## Additional information > Optional: Add implementation details, API changes, usage examples, screenshots, etc. --------- Signed-off-by: Matthew Owen <mowen@anyscale.com>
## Description Adding missing test for issue detection ## Related issues > Link related issues: "Fixes ray-project#1234", "Closes ray-project#1234", or "Related to ray-project#1234". ## Additional information > Optional: Add implementation details, API changes, usage examples, screenshots, etc. --------- Signed-off-by: Matthew Owen <mowen@anyscale.com>
## Description Adding missing test for issue detection ## Related issues > Link related issues: "Fixes ray-project#1234", "Closes ray-project#1234", or "Related to ray-project#1234". ## Additional information > Optional: Add implementation details, API changes, usage examples, screenshots, etc. --------- Signed-off-by: Matthew Owen <mowen@anyscale.com> Signed-off-by: Aydin Abiar <aydin@anyscale.com>
## Description Adding missing test for issue detection ## Related issues > Link related issues: "Fixes ray-project#1234", "Closes ray-project#1234", or "Related to ray-project#1234". ## Additional information > Optional: Add implementation details, API changes, usage examples, screenshots, etc. --------- Signed-off-by: Matthew Owen <mowen@anyscale.com> Signed-off-by: YK <1811651+ykdojo@users.noreply.github.com>
## Description Adding missing test for issue detection ## Related issues > Link related issues: "Fixes ray-project#1234", "Closes ray-project#1234", or "Related to ray-project#1234". ## Additional information > Optional: Add implementation details, API changes, usage examples, screenshots, etc. --------- Signed-off-by: Matthew Owen <mowen@anyscale.com>
Description
Adding missing test for issue detection
Related issues
Additional information