-
Notifications
You must be signed in to change notification settings - Fork 3
Refactor interactive task scheduler #798
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
WalkthroughIntroduces _execute_multiple_tasks in blockallocation and rewires BlockAllocationTaskScheduler threads to it. Reworks onetoone to thread-based single-task execution via execute_task_dict using an interface bootstrapped by spawner utilities. Simplifies shared.py to a single execute_task_dict API and adds task_done. Tests updated to import/use _execute_multiple_tasks from blockallocation. Changes
Sequence Diagram(s)sequenceDiagram
autonumber
participant Q as FutureQueue
participant WT as WorkerThread (_execute_multiple_tasks)
participant S as Spawner
participant IF as Interface
participant T as Task (execute_task_dict)
Q->>WT: get() task_dict or shutdown
alt task_dict contains fn/future
WT->>S: create connections (cores, kwargs)
WT->>IF: interface_bootup(get_interactive_execute_command, connections)
WT->>T: execute_task_dict(task_dict, interface, cache/log opts)
T-->>WT: result/exception set on future
WT->>Q: task_done()
WT->>IF: shutdown on error
else shutdown signal
WT->>IF: shutdown(wait)
WT->>Q: task_done()
opt queue_join_on_shutdown
WT->>Q: join()
end
WT-->>WT: exit
end
sequenceDiagram
autonumber
participant Sched as OneProcessTaskScheduler
participant Th as Thread (_execute_task_in_thread)
participant S as Spawner
participant IF as Interface
participant Exec as execute_task_dict
Sched->>Th: start with task_dict, cores, opts
Th->>S: create connections (cores, kwargs)
Th->>IF: interface_bootup(get_interactive_execute_command, connections)
Th->>Exec: execute_task_dict(task_dict, interface, cache/log opts)
Exec-->>Th: set future result/exception
Th->>IF: shutdown on error
Estimated code review effort🎯 4 (Complex) | ⏱️ ~60 minutes Possibly related PRs
Suggested reviewers
Poem
✨ Finishing Touches
🧪 Generate unit tests
Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out. 🪧 TipsChatThere are 3 ways to chat with CodeRabbit:
SupportNeed help? Create a ticket on our support page for assistance with any issues or questions. CodeRabbit Commands (Invoked using PR/Issue comments)Type Other keywords and placeholders
CodeRabbit Configuration File (
|
for more information, see https://pre-commit.ci
Codecov Report✅ All modified and coverable lines are covered by tests. Additional details and impacted files@@ Coverage Diff @@
## main #798 +/- ##
=======================================
Coverage 97.74% 97.75%
=======================================
Files 32 32
Lines 1466 1468 +2
=======================================
+ Hits 1433 1435 +2
Misses 33 33 ☔ View full report in Codecov by Sentry. 🚀 New features to boost your workflow:
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 1
Caution
Some comments are outside the diff and can’t be posted inline due to platform limitations.
⚠️ Outside diff range comments (5)
executorlib/task_scheduler/interactive/shared.py (2)
58-65: Do not shut down the interface on per-task exceptions; it kills the worker for subsequent tasksCalling interface.shutdown() here tears down the long-lived worker. After one task fails, the thread keeps looping with a dead interface and future tasks will fail/hang. Let the backend keep running and only set the future’s exception.
Apply:
@@ def _execute_task_without_cache(interface: SocketInterface, task_dict: dict): - except Exception as thread_exception: - interface.shutdown(wait=True) - f.set_exception(exception=thread_exception) + except Exception as thread_exception: + f.set_exception(exception=thread_exception) @@ def _execute_task_with_cache( - except Exception as thread_exception: - interface.shutdown(wait=True) - f.set_exception(exception=thread_exception) + except Exception as thread_exception: + f.set_exception(exception=thread_exception)Also applies to: 104-106
94-95: Cache hit detection should use an existence check, not membership in a listComparing absolute paths to the result of get_cache_files() is brittle. Use os.path.isfile for correctness and performance.
- if file_name not in get_cache_files(cache_directory=cache_directory): + if not os.path.isfile(file_name):executorlib/task_scheduler/interactive/blockallocation.py (2)
85-92: Infinite loop when shrinking max_workersFiltering alive threads in a tight while without waiting never reduces len(self._process) until threads actually exit; result: busy-spin/hang.
- if self._max_workers > max_workers: - for _ in range(self._max_workers - max_workers): - self._future_queue.queue.insert(0, {"shutdown": True, "wait": True}) - while len(self._process) > max_workers: - self._process = [ - process for process in self._process if process.is_alive() - ] + if self._max_workers > max_workers: + # Ask extra workers to exit + for _ in range(self._max_workers - max_workers): + self._future_queue.queue.insert(0, {"shutdown": True, "wait": True}) + # Wait until the desired number of workers remain + from time import sleep + while True: + alive = [p for p in self._process if p.is_alive()] + if len(alive) <= max_workers: + self._process = alive + break + sleep(0.05)
93-101: New workers do not receive a worker_idExisting threads get worker_id in init, but workers added later don’t. Pass stable IDs for observability and resource distribution.
- new_process_lst = [ - Thread( - target=_execute_multiple_tasks, - kwargs=self._process_kwargs, - ) - for _ in range(max_workers - self._max_workers) - ] + start_id = self._max_workers + new_process_lst = [ + Thread( + target=_execute_multiple_tasks, + kwargs=self._process_kwargs | {"worker_id": start_id + i}, + ) + for i in range(max_workers - self._max_workers) + ]executorlib/task_scheduler/interactive/onetoone.py (1)
218-269: Leak: spawned interfaces are never shut down in _execute_task_in_thread.A new interface is booted per task but not closed, risking orphaned subprocesses and leaked resources. Ensure shutdown in a finally block.
Apply this diff:
def _execute_task_in_thread( @@ - execute_task_dict( - task_dict=task_dict, - interface=interface_bootup( - command_lst=get_interactive_execute_command( - cores=cores, - ), - connections=spawner(cores=cores, **kwargs), - hostname_localhost=hostname_localhost, - log_obj_size=log_obj_size, - worker_id=worker_id, - ), - cache_directory=cache_directory, - cache_key=cache_key, - error_log_file=error_log_file, - ) + interface = interface_bootup( + command_lst=get_interactive_execute_command(cores=cores), + connections=spawner(cores=cores, **kwargs), + hostname_localhost=hostname_localhost, + log_obj_size=log_obj_size, + worker_id=worker_id, + ) + try: + execute_task_dict( + task_dict=task_dict, + interface=interface, + cache_directory=cache_directory, + cache_key=cache_key, + error_log_file=error_log_file, + ) + finally: + # Ensure spawned backend is always torn down + interface.shutdown(wait=True)
🧹 Nitpick comments (11)
executorlib/task_scheduler/interactive/shared.py (4)
108-110: Ensure Future state transitions are consistent on cache hitsCall set_running_or_notify_cancel() before set_result() for symmetry with the no-cache path.
- future = task_dict["future"] - future.set_result(result) + future = task_dict["future"] + if not future.done() and future.set_running_or_notify_cancel(): + future.set_result(result)
49-65: Avoid mutating the incoming task_dict; send a payload without 'future'Popping 'future' only on the cache-miss path makes mutation inconsistent and risks leaking internal keys to the backend if code changes. Build a payload containing only allowed keys.
- f = task_dict.pop("future") + f = task_dict["future"] + payload = {k: task_dict[k] for k in ("fn", "args", "kwargs") if k in task_dict} @@ - f.set_result(interface.send_and_receive_dict(input_dict=task_dict)) + f.set_result(interface.send_and_receive_dict(input_dict=payload))And analogously in the cache-miss block of _execute_task_with_cache.
Also applies to: 86-103
25-29: Docstring default is misleading for cache_directorySignature defaults to None (no caching), but the docstring says Defaults to "executorlib_cache". Please align the docstring.
- cache_directory (str, optional): The directory to store cache files. Defaults to "executorlib_cache". + cache_directory (str, optional): If provided, directory to store cache files; otherwise no caching.
44-46: Silently suppressing task_done() errors can hide bugsSuppressing ValueError avoids crashes but can mask double-acks. Consider logging once when suppression occurs.
executorlib/task_scheduler/interactive/blockallocation.py (1)
196-218: Docstring nit: “single tasks” → “tasks from a queue”Minor wording fix.
tests/test_mpiexecspawner.py (1)
13-13: Import/call-site updates look correct; consider avoiding private API in testsSwitching to _execute_multiple_tasks matches the refactor. Relying on a private name in tests is brittle; consider re-exporting a public alias (e.g., execute_multiple_tasks) for test use.
Also applies to: 263-269, 445-450, 461-466, 477-482, 495-500, 518-524, 537-543
tests/test_singlenodeexecutor_shell_executor.py (1)
8-8: Aligned with new entrypoint; same note on private APIThe import and invocations are correct. Prefer a public alias over importing a leading-underscore function.
Also applies to: 35-40, 61-66, 88-93
tests/test_singlenodeexecutor_shell_interactive.py (1)
9-9: Updated to new entrypoint and parameters; goodIncluding spawner and init_function is consistent with the new signature. Same caveat about private API imports.
Also applies to: 91-97
tests/test_fluxpythonspawner.py (1)
114-119: Minor: Avoid redundant queue joins in tests._execute_multiple_tasks already joins the queue on shutdown by default (queue_join_on_shutdown=True). The extra q.join() right after the call is redundant.
- Option A: Keep as-is (harmless).
- Option B: Pass queue_join_on_shutdown=False and keep q.join().
- Option C (cleanest): Keep default and drop the trailing q.join().
Also applies to: 129-135
executorlib/task_scheduler/interactive/onetoone.py (2)
71-99: Docstring and shutdown semantics nit.
- Wording: “Execute a single tasks…” → “Execute tasks…”.
- Consider mirroring blockallocation’s queue_join_on_shutdown flag for consistency and test control.
- Execute a single tasks in parallel using the message passing interface (MPI). + Execute tasks in parallel using the message passing interface (MPI).
210-216: Wrapper now targets a thread function — name is misleading._wrap_execute_task_in_separate_process now starts a thread; consider renaming for clarity (e.g., _submit_task_threaded) in a follow-up to reduce confusion.
📜 Review details
Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro
💡 Knowledge Base configuration:
- MCP integration is disabled by default for public repositories
- Jira integration is disabled by default for public repositories
- Linear integration is disabled by default for public repositories
You can enable these sources in your CodeRabbit configuration.
📒 Files selected for processing (7)
executorlib/task_scheduler/interactive/blockallocation.py(4 hunks)executorlib/task_scheduler/interactive/onetoone.py(3 hunks)executorlib/task_scheduler/interactive/shared.py(2 hunks)tests/test_fluxpythonspawner.py(3 hunks)tests/test_mpiexecspawner.py(8 hunks)tests/test_singlenodeexecutor_shell_executor.py(4 hunks)tests/test_singlenodeexecutor_shell_interactive.py(2 hunks)
🧰 Additional context used
🧬 Code graph analysis (7)
tests/test_singlenodeexecutor_shell_executor.py (1)
executorlib/task_scheduler/interactive/blockallocation.py (1)
_execute_multiple_tasks(182-250)
tests/test_mpiexecspawner.py (1)
executorlib/task_scheduler/interactive/blockallocation.py (2)
BlockAllocationTaskScheduler(18-179)_execute_multiple_tasks(182-250)
tests/test_fluxpythonspawner.py (1)
executorlib/task_scheduler/interactive/blockallocation.py (2)
BlockAllocationTaskScheduler(18-179)_execute_multiple_tasks(182-250)
executorlib/task_scheduler/interactive/blockallocation.py (5)
executorlib/standalone/command.py (1)
get_interactive_execute_command(94-116)executorlib/standalone/inputcheck.py (2)
check_resource_dict(63-71)check_resource_dict_is_empty(74-81)executorlib/standalone/interactive/communication.py (3)
interface_bootup(156-204)send_dict(46-57)shutdown(120-134)executorlib/standalone/interactive/spawner.py (4)
BaseSpawner(9-59)MpiExecSpawner(141-158)shutdown(42-49)shutdown(117-129)executorlib/task_scheduler/interactive/shared.py (2)
execute_task_dict(11-41)task_done(44-46)
executorlib/task_scheduler/interactive/shared.py (2)
executorlib/standalone/interactive/communication.py (1)
SocketInterface(14-153)executorlib/standalone/serialize.py (1)
serialize_funct(31-85)
tests/test_singlenodeexecutor_shell_interactive.py (1)
executorlib/task_scheduler/interactive/blockallocation.py (1)
_execute_multiple_tasks(182-250)
executorlib/task_scheduler/interactive/onetoone.py (4)
executorlib/standalone/command.py (1)
get_interactive_execute_command(94-116)executorlib/standalone/interactive/communication.py (1)
interface_bootup(156-204)executorlib/standalone/interactive/spawner.py (2)
BaseSpawner(9-59)MpiExecSpawner(141-158)executorlib/task_scheduler/interactive/shared.py (1)
execute_task_dict(11-41)
⏰ Context from checks skipped due to timeout of 90000ms. You can increase the timeout in your CodeRabbit configuration to a maximum of 15 minutes (900000ms). (13)
- GitHub Check: unittest_old
- GitHub Check: benchmark (ubuntu-latest, 3.13, .ci_support/environment-mpich.yml)
- GitHub Check: unittest_flux_openmpi
- GitHub Check: unittest_openmpi (ubuntu-latest, 3.11)
- GitHub Check: unittest_openmpi (macos-latest, 3.13)
- GitHub Check: unittest_openmpi (ubuntu-22.04-arm, 3.13)
- GitHub Check: unittest_openmpi (ubuntu-latest, 3.12)
- GitHub Check: unittest_openmpi (ubuntu-24.04-arm, 3.13)
- GitHub Check: unittest_openmpi (ubuntu-latest, 3.13)
- GitHub Check: benchmark (ubuntu-latest, 3.13, .ci_support/environment-openmpi.yml)
- GitHub Check: unittest_win
- GitHub Check: unittest_slurm_mpich
- GitHub Check: notebooks_integration
🔇 Additional comments (4)
executorlib/task_scheduler/interactive/blockallocation.py (1)
69-71: Switching worker target to _execute_multiple_tasks looks goodThe new entrypoint is clearer and keeps orchestration close to block allocation.
tests/test_fluxpythonspawner.py (1)
8-8: Import update to new internal API looks correct.Switching to BlockAllocationTaskScheduler and _execute_multiple_tasks from blockallocation matches the refactor intent.
executorlib/task_scheduler/interactive/onetoone.py (2)
5-6: New imports align with the thread-based execution path.get_interactive_execute_command and interface_bootup are the right dependencies for per-task interface bootstrapping.
65-67: Switching scheduler target to _execute_single_task is consistent.Thread target rename is OK and matches the new control flow.
| def _execute_multiple_tasks( | ||
| future_queue: queue.Queue, | ||
| cores: int = 1, | ||
| spawner: type[BaseSpawner] = MpiExecSpawner, | ||
| hostname_localhost: Optional[bool] = None, | ||
| init_function: Optional[Callable] = None, | ||
| cache_directory: Optional[str] = None, | ||
| cache_key: Optional[str] = None, | ||
| queue_join_on_shutdown: bool = True, | ||
| log_obj_size: bool = False, | ||
| error_log_file: Optional[str] = None, | ||
| worker_id: Optional[int] = None, | ||
| **kwargs, | ||
| ) -> None: | ||
| """ | ||
| Execute a single tasks in parallel using the message passing interface (MPI). | ||
| Args: | ||
| future_queue (queue.Queue): task queue of dictionary objects which are submitted to the parallel process | ||
| cores (int): defines the total number of MPI ranks to use | ||
| spawner (BaseSpawner): Spawner to start process on selected compute resources | ||
| hostname_localhost (boolean): use localhost instead of the hostname to establish the zmq connection. In the | ||
| context of an HPC cluster this essential to be able to communicate to an | ||
| Executor running on a different compute node within the same allocation. And | ||
| in principle any computer should be able to resolve that their own hostname | ||
| points to the same address as localhost. Still MacOS >= 12 seems to disable | ||
| this look up for security reasons. So on MacOS it is required to set this | ||
| option to true | ||
| init_function (Callable): optional function to preset arguments for functions which are submitted later | ||
| cache_directory (str, optional): The directory to store cache files. Defaults to "executorlib_cache". | ||
| cache_key (str, optional): By default the cache_key is generated based on the function hash, this can be | ||
| overwritten by setting the cache_key. | ||
| queue_join_on_shutdown (bool): Join communication queue when thread is closed. Defaults to True. | ||
| log_obj_size (bool): Enable debug mode which reports the size of the communicated objects. | ||
| error_log_file (str): Name of the error log file to use for storing exceptions raised by the Python functions | ||
| submitted to the Executor. | ||
| worker_id (int): Communicate the worker which ID was assigned to it for future reference and resource | ||
| distribution. | ||
| """ | ||
| interface = interface_bootup( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
🛠️ Refactor suggestion
Harden worker against unexpected exceptions and ensure cleanup
If an exception escapes the loop, the remote process is left running. Wrap with try/except/finally to always shut down the interface.
def _execute_multiple_tasks(
@@
- interface = interface_bootup(
+ interface = interface_bootup(
@@
- if init_function is not None:
- interface.send_dict(
- input_dict={"init": True, "fn": init_function, "args": (), "kwargs": {}}
- )
- while True:
- task_dict = future_queue.get()
- if "shutdown" in task_dict and task_dict["shutdown"]:
- interface.shutdown(wait=task_dict["wait"])
- task_done(future_queue=future_queue)
- if queue_join_on_shutdown:
- future_queue.join()
- break
- elif "fn" in task_dict and "future" in task_dict:
- execute_task_dict(
- task_dict=task_dict,
- interface=interface,
- cache_directory=cache_directory,
- cache_key=cache_key,
- error_log_file=error_log_file,
- )
- task_done(future_queue=future_queue)
+ try:
+ if init_function is not None:
+ interface.send_dict(
+ input_dict={"init": True, "fn": init_function, "args": (), "kwargs": {}}
+ )
+ while True:
+ task_dict = future_queue.get()
+ if "shutdown" in task_dict and task_dict["shutdown"]:
+ interface.shutdown(wait=task_dict["wait"])
+ task_done(future_queue=future_queue)
+ if queue_join_on_shutdown:
+ future_queue.join()
+ break
+ elif "fn" in task_dict and "future" in task_dict:
+ try:
+ execute_task_dict(
+ task_dict=task_dict,
+ interface=interface,
+ cache_directory=cache_directory,
+ cache_key=cache_key,
+ error_log_file=error_log_file,
+ )
+ finally:
+ task_done(future_queue=future_queue)
+ except Exception:
+ # Best-effort cleanup if something unexpected happens
+ interface.shutdown(wait=True)
+ raise📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
| def _execute_multiple_tasks( | |
| future_queue: queue.Queue, | |
| cores: int = 1, | |
| spawner: type[BaseSpawner] = MpiExecSpawner, | |
| hostname_localhost: Optional[bool] = None, | |
| init_function: Optional[Callable] = None, | |
| cache_directory: Optional[str] = None, | |
| cache_key: Optional[str] = None, | |
| queue_join_on_shutdown: bool = True, | |
| log_obj_size: bool = False, | |
| error_log_file: Optional[str] = None, | |
| worker_id: Optional[int] = None, | |
| **kwargs, | |
| ) -> None: | |
| """ | |
| Execute a single tasks in parallel using the message passing interface (MPI). | |
| Args: | |
| future_queue (queue.Queue): task queue of dictionary objects which are submitted to the parallel process | |
| cores (int): defines the total number of MPI ranks to use | |
| spawner (BaseSpawner): Spawner to start process on selected compute resources | |
| hostname_localhost (boolean): use localhost instead of the hostname to establish the zmq connection. In the | |
| context of an HPC cluster this essential to be able to communicate to an | |
| Executor running on a different compute node within the same allocation. And | |
| in principle any computer should be able to resolve that their own hostname | |
| points to the same address as localhost. Still MacOS >= 12 seems to disable | |
| this look up for security reasons. So on MacOS it is required to set this | |
| option to true | |
| init_function (Callable): optional function to preset arguments for functions which are submitted later | |
| cache_directory (str, optional): The directory to store cache files. Defaults to "executorlib_cache". | |
| cache_key (str, optional): By default the cache_key is generated based on the function hash, this can be | |
| overwritten by setting the cache_key. | |
| queue_join_on_shutdown (bool): Join communication queue when thread is closed. Defaults to True. | |
| log_obj_size (bool): Enable debug mode which reports the size of the communicated objects. | |
| error_log_file (str): Name of the error log file to use for storing exceptions raised by the Python functions | |
| submitted to the Executor. | |
| worker_id (int): Communicate the worker which ID was assigned to it for future reference and resource | |
| distribution. | |
| """ | |
| interface = interface_bootup( | |
| interface = interface_bootup( | |
| cores=cores, | |
| spawner=spawner, | |
| hostname_localhost=hostname_localhost, | |
| log_obj_size=log_obj_size, | |
| worker_id=worker_id, | |
| **kwargs, | |
| ) | |
| try: | |
| if init_function is not None: | |
| interface.send_dict( | |
| input_dict={"init": True, "fn": init_function, "args": (), "kwargs": {}} | |
| ) | |
| while True: | |
| task_dict = future_queue.get() | |
| if "shutdown" in task_dict and task_dict["shutdown"]: | |
| interface.shutdown(wait=task_dict["wait"]) | |
| task_done(future_queue=future_queue) | |
| if queue_join_on_shutdown: | |
| future_queue.join() | |
| break | |
| elif "fn" in task_dict and "future" in task_dict: | |
| try: | |
| execute_task_dict( | |
| task_dict=task_dict, | |
| interface=interface, | |
| cache_directory=cache_directory, | |
| cache_key=cache_key, | |
| error_log_file=error_log_file, | |
| ) | |
| finally: | |
| task_done(future_queue=future_queue) | |
| except Exception: | |
| # Best‐effort cleanup if something unexpected happens | |
| interface.shutdown(wait=True) | |
| raise |
🤖 Prompt for AI Agents
In executorlib/task_scheduler/interactive/blockallocation.py around lines 182 to
221, the function _execute_multiple_tasks can let exceptions escape the task
loop and leave the remote interface/process running; wrap the main execution
loop and any code that can raise in a try/except/finally so that exceptions are
caught, logged to error_log_file (or re-raised after logging) and the interface
is always cleaned up in the finally block (call interface.shutdown() and any
spawner/cleanup routines), ensuring any threads/queues are joined and resources
released even on error.
Summary by CodeRabbit
New Features
Refactor
Tests