-
Notifications
You must be signed in to change notification settings - Fork 3
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[feature] resize block allocation executor #589
Conversation
Caution Review failedThe pull request is closed. WalkthroughThe changes introduce a new Changes
Sequence Diagram(s)sequenceDiagram
participant Client
participant Executor as BlockAllocationExecutor
participant Pool as ThreadPool Manager
Client->>Executor: Set new max_workers value
alt New value < current threads
Executor->>Pool: Insert shutdown commands for excess threads
Pool-->>Executor: Acknowledge thread removal
else New value > current threads
Executor->>Pool: Create and start additional worker threads
Pool-->>Executor: Confirm new threads started
end
Possibly related PRs
Poem
📜 Recent review detailsConfiguration used: CodeRabbit UI 📒 Files selected for processing (3)
✨ Finishing Touches
Thank you for using CodeRabbit. We offer it for free to the OSS community and would appreciate your support in helping us grow. If you find it useful, would you consider giving us a shout-out on your favorite social media? 🪧 TipsChatThere are 3 ways to chat with CodeRabbit:
Note: Be mindful of the bot's finite context window. It's strongly recommended to break down tasks such as reading entire modules into smaller chunks. For a focused discussion, use review comments to chat about specific files and their changes, instead of using the PR comments. CodeRabbit Commands (Invoked using PR comments)
Other keywords and placeholders
CodeRabbit Configuration File (
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 0
🧹 Nitpick comments (3)
executorlib/interactive/blockallocation.py (1)
76-96
: Consider adding validation and documentation for max_workers setter.The resizing logic is well-implemented, but could benefit from some improvements:
- Add validation for negative or zero values
- Document the behavior in docstring
- Consider using a lock to prevent concurrent modifications
Apply this diff to add the improvements:
@max_workers.setter def max_workers(self, max_workers: int): + """ + Set the maximum number of worker threads. + + Args: + max_workers (int): The new maximum number of worker threads. + Must be a positive integer. + + Raises: + ValueError: If max_workers is less than 1. + """ + if max_workers < 1: + raise ValueError("max_workers must be at least 1") + if self._max_workers > max_workers:tests/test_local_executor_resize.py (1)
12-24
: Add more test coverage for edge cases.While the current tests cover basic increase/decrease scenarios, consider adding tests for:
- Edge case: Setting max_workers to 1
- Error case: Setting max_workers to 0 or negative
- Stress test: Rapid resize operations
Here's a suggested additional test:
def test_invalid_max_workers(self): with SingleNodeExecutor(max_workers=1, block_allocation=True, disable_dependencies=True) as exe: with self.assertRaises(ValueError): exe.max_workers = 0 with self.assertRaises(ValueError): exe.max_workers = -1Also applies to: 25-36
executorlib/interactive/shared.py (1)
176-180
: Use contextlib.suppress for cleaner error handling.The error handling can be simplified using
contextlib.suppress
.Apply this diff to improve the implementation:
+from contextlib import suppress def _task_done(future_queue: queue.Queue): - try: + with suppress(ValueError): future_queue.task_done() - except ValueError: - pass🧰 Tools
🪛 Ruff (0.8.2)
177-180: Use
contextlib.suppress(ValueError)
instead oftry
-except
-pass
Replace with
contextlib.suppress(ValueError)
(SIM105)
📜 Review details
Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro
📒 Files selected for processing (3)
executorlib/interactive/blockallocation.py
(2 hunks)executorlib/interactive/shared.py
(3 hunks)tests/test_local_executor_resize.py
(1 hunks)
🧰 Additional context used
🪛 Ruff (0.8.2)
executorlib/interactive/shared.py
177-180: Use contextlib.suppress(ValueError)
instead of try
-except
-pass
Replace with contextlib.suppress(ValueError)
(SIM105)
⏰ Context from checks skipped due to timeout of 90000ms (17)
- GitHub Check: unittest_openmpi (ubuntu-latest, 3.10)
- GitHub Check: unittest_openmpi (ubuntu-latest, 3.11)
- GitHub Check: unittest_mpich (ubuntu-latest, 3.10)
- GitHub Check: unittest_openmpi (ubuntu-latest, 3.12)
- GitHub Check: unittest_win
- GitHub Check: unittest_mpich (ubuntu-latest, 3.11)
- GitHub Check: unittest_openmpi (ubuntu-latest, 3.13)
- GitHub Check: unittest_mpich (ubuntu-latest, 3.12)
- GitHub Check: unittest_openmpi (macos-latest, 3.13)
- GitHub Check: unittest_mpich (ubuntu-latest, 3.13)
- GitHub Check: unittest_old
- GitHub Check: unittest_mpich (macos-latest, 3.13)
- GitHub Check: unittest_flux_openmpi
- GitHub Check: unittest_flux_mpich
- GitHub Check: notebooks
- GitHub Check: benchmark (ubuntu-latest, 3.13, .ci_support/environment-mpich.yml)
- GitHub Check: benchmark (ubuntu-latest, 3.13, .ci_support/environment-openmpi.yml)
🔇 Additional comments (1)
executorlib/interactive/blockallocation.py (1)
61-61
: LGTM! Good encapsulation of max_workers.The change from direct usage to a private variable improves encapsulation and enables the resizing functionality.
Also applies to: 68-68
# Conflicts: # executorlib/interactive/blockallocation.py # executorlib/interactive/shared.py
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 0
🧹 Nitpick comments (1)
executorlib/interactive/shared.py (1)
177-179
: Add docstring documentation to the new helper function.The new
_task_done
function is well-implemented, but it would benefit from docstring documentation explaining its purpose and the rationale for suppressing ValueError.Add this docstring:
def _task_done(future_queue: queue.Queue): + """ + Safely mark a task as done in the future queue. + + This helper function centralizes the task completion logic and handles the case + where the queue is already empty when task_done() is called, which can happen + during cleanup operations. + + Args: + future_queue (queue.Queue): The queue containing the tasks to be marked as done. + """ with contextlib.suppress(ValueError): future_queue.task_done()
📜 Review details
Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro
📒 Files selected for processing (2)
executorlib/interactive/blockallocation.py
(2 hunks)executorlib/interactive/shared.py
(4 hunks)
🚧 Files skipped from review as they are similar to previous changes (1)
- executorlib/interactive/blockallocation.py
🔇 Additional comments (5)
executorlib/interactive/shared.py (5)
61-61
: LGTM! Improved error handling during shutdown.The change enhances robustness by using the new helper function to safely mark tasks as done during shutdown.
121-121
: LGTM! Enhanced error handling during task failure.The change improves robustness by safely marking tasks as done when exceptions occur during task execution.
124-124
: LGTM! Improved task completion handling.The change enhances robustness by safely marking tasks as done after successful execution.
165-165
: LGTM! Enhanced error handling in cached task execution.The changes improve robustness by safely marking tasks as done in both success and failure paths of cached task execution.
Also applies to: 169-169
174-174
: LGTM! Improved task completion handling for cached results.The change enhances robustness by safely marking tasks as done when retrieving cached results.
for more information, see https://pre-commit.ci
Codecov ReportAll modified and coverable lines are covered by tests ✅
Additional details and impacted files@@ Coverage Diff @@
## main #589 +/- ##
==========================================
+ Coverage 96.14% 96.38% +0.23%
==========================================
Files 28 28
Lines 1194 1245 +51
==========================================
+ Hits 1148 1200 +52
+ Misses 46 45 -1 ☔ View full report in Codecov by Sentry. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 0
🧹 Nitpick comments (6)
tests/test_local_executor_resize.py (3)
43-54
: Add process count verification.Consider adding
self.assertEqual(len(exe._process), 1)
after line 50 to verify the process count matches the new max_workers value, similar to the test without dependencies.
55-68
: Add process count verification.Consider adding
self.assertEqual(len(exe._process), 2)
after line 63 to verify the process count matches the new max_workers value, similar to the test without dependencies.
69-72
: Simplify nested with statements.Consider combining the nested with statements into a single statement for better readability:
- with self.assertRaises(NotImplementedError): - with SingleNodeExecutor(block_allocation=False) as exe: + with self.assertRaises(NotImplementedError), SingleNodeExecutor(block_allocation=False) as exe:🧰 Tools
🪛 Ruff (0.8.2)
70-71: Use a single
with
statement with multiple contexts instead of nestedwith
statements(SIM117)
executorlib/base/executor.py (1)
35-44
: Add docstring for max_workers property.Consider adding a docstring to document the property's behavior:
@property def max_workers(self) -> Optional[int]: + """ + Get the maximum number of worker threads. + + Returns: + Optional[int]: The number of active worker threads if _process is a list, + otherwise the value from _process_kwargs["max_workers"]. + """ if isinstance(self._process, list): return len(self._process) else: return self._process_kwargs.get("max_workers")executorlib/interactive/dependency.py (2)
85-95
: Add docstring for max_workers property.Consider adding a docstring to document the property's behavior:
@property def max_workers(self) -> Optional[int]: + """ + Get the maximum number of worker threads. + + Returns: + Optional[int]: The current maximum number of worker threads, + or None if the future queue is not available. + """ if isinstance(self._future_queue, queue.Queue): f: Future = Future() self._future_queue.queue.insert( 0, {"internal": True, "task": "get_max_workers", "future": f} ) return f.result() else: return None
96-111
: Add docstring for max_workers setter.Consider adding a docstring to document the setter's behavior:
@max_workers.setter def max_workers(self, max_workers: int): + """ + Set the maximum number of worker threads. + + Args: + max_workers (int): The new maximum number of worker threads. + + Raises: + NotImplementedError: If the operation is not supported by the underlying executor. + """ if isinstance(self._future_queue, queue.Queue): f: Future = Future() self._future_queue.queue.insert( 0, { "internal": True, "task": "set_max_workers", "max_workers": max_workers, "future": f, }, ) if not f.result(): raise NotImplementedError("The max_workers setter is not implemented.")
📜 Review details
Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro
📒 Files selected for processing (3)
executorlib/base/executor.py
(1 hunks)executorlib/interactive/dependency.py
(2 hunks)tests/test_local_executor_resize.py
(1 hunks)
🧰 Additional context used
🪛 Ruff (0.8.2)
tests/test_local_executor_resize.py
70-71: Use a single with
statement with multiple contexts instead of nested with
statements
(SIM117)
⏰ Context from checks skipped due to timeout of 90000ms (17)
- GitHub Check: unittest_openmpi (ubuntu-latest, 3.10)
- GitHub Check: unittest_openmpi (ubuntu-latest, 3.11)
- GitHub Check: unittest_mpich (ubuntu-latest, 3.10)
- GitHub Check: unittest_openmpi (ubuntu-latest, 3.12)
- GitHub Check: unittest_mpich (ubuntu-latest, 3.11)
- GitHub Check: unittest_win
- GitHub Check: unittest_openmpi (ubuntu-latest, 3.13)
- GitHub Check: unittest_mpich (ubuntu-latest, 3.12)
- GitHub Check: unittest_openmpi (macos-latest, 3.13)
- GitHub Check: unittest_mpich (ubuntu-latest, 3.13)
- GitHub Check: unittest_old
- GitHub Check: unittest_mpich (macos-latest, 3.13)
- GitHub Check: unittest_flux_openmpi
- GitHub Check: unittest_flux_mpich
- GitHub Check: notebooks
- GitHub Check: benchmark (ubuntu-latest, 3.13, .ci_support/environment-mpich.yml)
- GitHub Check: benchmark (ubuntu-latest, 3.13, .ci_support/environment-openmpi.yml)
🔇 Additional comments (3)
tests/test_local_executor_resize.py (2)
13-26
: LGTM! Test validates worker decrease behavior.The test thoroughly verifies:
- Initial state with 2 workers
- Proper transition to 1 worker
- Task completion and results accuracy
27-42
: LGTM! Test validates worker increase behavior.The test thoroughly verifies:
- Initial state with 1 worker
- Proper transition to 2 workers
- Task completion and results accuracy
executorlib/interactive/dependency.py (1)
220-228
: LGTM! Task handling for max_workers operations.The implementation correctly handles both get and set operations for max_workers, following the existing pattern for internal tasks.
Example:
Summary by CodeRabbit
New Features
Refactor
Tests