Skip to content

Conversation

@jan-janssen
Copy link
Member

@jan-janssen jan-janssen commented Aug 31, 2025

Summary by CodeRabbit

  • New Features

    • Workers can now process multiple tasks sequentially with graceful, queue-driven shutdown.
    • Added a single-task execution mode for targeted runs.
    • Optional initialization hooks and worker configuration are supported during startup.
  • Refactor

    • Interactive schedulers updated to use the new worker execution modes.
    • Simplified task payloads by replacing per-task queue mechanics with structured, dictionary-style payloads.
  • Tests

    • Test suites updated to exercise the new execution entry points and options while preserving behavior.

@coderabbitai
Copy link
Contributor

coderabbitai bot commented Aug 31, 2025

Warning

Rate limit exceeded

@jan-janssen has exceeded the limit for the number of commits or files that can be reviewed per hour. Please wait 8 minutes and 14 seconds before requesting another review.

⌛ How to resolve this issue?

After the wait time has elapsed, a review can be triggered using the @coderabbitai review command as a PR comment. Alternatively, push new commits to this PR.

We recommend that you space out your commits to avoid hitting the rate limit.

🚦 How do rate limits work?

CodeRabbit enforces hourly rate limits for each developer per organization.

Our paid plans have higher rate limits than the trial, open-source and free plans. In all cases, we re-allow further reviews after a brief timeout.

Please see our FAQ for further information.

📥 Commits

Reviewing files that changed from the base of the PR and between b67fb12 and b05b2b5.

📒 Files selected for processing (2)
  • executorlib/task_scheduler/interactive/onetoone.py (2 hunks)
  • executorlib/task_scheduler/interactive/shared.py (2 hunks)

Walkthrough

Replaces the single-entry worker API with two entry points: a queue-driven loop execute_multiple_tasks and a single-task runner execute_single_task. Schedulers and tests updated: BlockAllocation workers now use execute_multiple_tasks; OneToOne uses execute_single_task and drops per-task queue plumbing; tests updated to call the new APIs.

Changes

Cohort / File(s) Summary
Interactive execution core
executorlib/task_scheduler/interactive/shared.py
Rename execute_tasksexecute_multiple_tasks (queue-driven loop with shutdown sentinel); add execute_single_task for one-off execution; preserve interface bootup, optional init_function, error_log propagation, and cache/no-cache execution helpers.
Block-allocation scheduler
executorlib/task_scheduler/interactive/blockallocation.py
Import updated; worker threads created with target=execute_multiple_tasks in __init__ and in max_workers scaling.
One-to-one scheduler
executorlib/task_scheduler/interactive/onetoone.py
Import updated; thread target switched to execute_single_task; removed per-task queue objects (qtask, qtask_lst) and updated _wrap_execute_task_in_separate_process signature to stop passing a qtask and instead pass task_dict in kwargs.
Tests: Flux spawner
tests/test_fluxpythonspawner.py
Update import and call sites from execute_tasksexecute_multiple_tasks (arguments unchanged).
Tests: MPI exec spawner
tests/test_mpiexecspawner.py
Update import and all call sites from execute_tasksexecute_multiple_tasks (arguments unchanged).
Tests: single-node shell executor
tests/test_singlenodeexecutor_shell_executor.py
Replace execute_tasks with execute_multiple_tasks across tests (arguments unchanged).
Tests: single-node shell interactive
tests/test_singlenodeexecutor_shell_interactive.py
Replace execute_tasks with execute_multiple_tasks; call sites updated to include spawner and init_function kwargs where required.

Sequence Diagram(s)

sequenceDiagram
  autonumber
  participant T as Thread (Worker)
  participant S as execute_multiple_tasks
  participant Q as future_queue
  participant IF as Interface
  participant FN as Task fn
  participant F as Future

  T->>S: start(worker_id, kwargs…)
  S->>IF: interface_bootup(spawner, commands, …)
  alt init_function present
    S->>IF: init_function()
  end
  loop until shutdown sentinel
    S->>Q: get()
    alt shutdown sentinel
      S->>IF: shutdown(wait, join_queue)
      S-->>T: return
    else task_dict
      S->>FN: execute (with/without cache)
      FN-->>S: result / exception
      S->>F: set_result / set_exception
      S->>Q: task_done()
    end
  end
Loading
sequenceDiagram
  autonumber
  participant T as Thread (Worker)
  participant S1 as execute_single_task
  participant IF as Interface
  participant FN as Task fn
  participant F as Future

  T->>S1: start(task_dict, kwargs…)
  S1->>IF: interface_bootup(…)
  alt init_function present
    S1->>IF: init_function()
  end
  S1->>FN: execute (with/without cache)
  FN-->>S1: result / exception
  S1->>F: set_result / set_exception
  S1->>IF: shutdown(wait)
  S1-->>T: return
Loading
sequenceDiagram
  autonumber
  participant BA as BlockAllocationTaskScheduler
  participant Th as Thread
  participant S as execute_multiple_tasks

  BA->>Th: Thread(target=S, kwargs=…)
  Th->>S: run queue-driven worker
  BA->>Th: scale up via max_workers.setter → new Thread(target=S)
Loading
sequenceDiagram
  autonumber
  participant O as OneToOneTaskScheduler
  participant Th as Thread
  participant S1 as execute_single_task

  O->>Th: Thread(target=S1, kwargs={task_dict,…})
  Th->>S1: run single-task worker (no qtask)
Loading

Estimated code review effort

🎯 4 (Complex) | ⏱️ ~60 minutes

Possibly related PRs

Poem

I hop through queues with tidy care,
From single nibble to multi-hare.
Threads now point where workers meet,
One-to-one or block’s heartbeat.
Shutdown whispers, futures gleam— carrots cached, result supreme. 🥕✨

✨ Finishing Touches
  • 📝 Generate Docstrings
🧪 Generate unit tests
  • Create PR with unit tests
  • Post copyable unit tests in a comment
  • Commit unit tests in branch execute_single_task

Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out.

❤️ Share
🪧 Tips

Chat

There are 3 ways to chat with CodeRabbit:

  • Review comments: Directly reply to a review comment made by CodeRabbit. Example:
    • I pushed a fix in commit <commit_id>, please review it.
    • Open a follow-up GitHub issue for this discussion.
  • Files and specific lines of code (under the "Files changed" tab): Tag @coderabbitai in a new review comment at the desired location with your query.
  • PR comments: Tag @coderabbitai in a new PR comment to ask questions about the PR branch. For the best results, please provide a very specific query, as very limited context is provided in this mode. Examples:
    • @coderabbitai gather interesting stats about this repository and render them as a table. Additionally, render a pie chart showing the language distribution in the codebase.
    • @coderabbitai read the files in the src/scheduler package and generate a class diagram using mermaid and a README in the markdown format.

Support

Need help? Create a ticket on our support page for assistance with any issues or questions.

CodeRabbit Commands (Invoked using PR/Issue comments)

Type @coderabbitai help to get the list of available commands.

Other keywords and placeholders

  • Add @coderabbitai ignore or @coderabbit ignore anywhere in the PR description to prevent this PR from being reviewed.
  • Add @coderabbitai summary to generate the high-level summary at a specific location in the PR description.
  • Add @coderabbitai anywhere in the PR title to generate the title automatically.

CodeRabbit Configuration File (.coderabbit.yaml)

  • You can programmatically configure CodeRabbit by adding a .coderabbit.yaml file to the root of your repository.
  • Please see the configuration documentation for more information.
  • If your editor has YAML language server enabled, you can add the path at the top of this file to enable auto-completion and validation: # yaml-language-server: $schema=https://coderabbit.ai/integrations/schema.v2.json

Status, Documentation and Community

  • Visit our Status Page to check the current availability of CodeRabbit.
  • Visit our Documentation for detailed information on how to use CodeRabbit.
  • Join our Discord Community to get help, request features, and share feedback.
  • Follow us on X/Twitter for updates and announcements.

Copy link
Contributor

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 1

Caution

Some comments are outside the diff and can’t be posted inline due to platform limitations.

⚠️ Outside diff range comments (4)
tests/test_mpiexecspawner.py (2)

446-451: Leftover calls to removed execute_tasks will fail. Replace with execute_multiple_tasks.

These tests will error at runtime.

-        execute_tasks(
+        execute_multiple_tasks(
             future_queue=q,
             cores=1,
             openmpi_oversubscribe=False,
             spawner=MpiExecSpawner,
         )

Repeat the same replacement at both remaining call sites.

Also applies to: 462-467, 538-544


446-447: Remove remaining execute_tasks calls in tests

  • tests/test_mpiexecspawner.py: lines 446–448
  • tests/test_mpiexecspawner.py: lines 462–464
  • tests/test_mpiexecspawner.py: lines 538–540

Replace or remove these per the updated API.

executorlib/task_scheduler/interactive/blockallocation.py (1)

91-97: Propagate worker_id when scaling up workers

New threads created during scale-up don’t receive a worker_id, unlike those from init. Given execute_multiple_tasks supports worker_id (used for logging/resource distribution), keep this consistent.

Apply:

-            elif self._max_workers < max_workers:
-                new_process_lst = [
-                    Thread(
-                        target=execute_multiple_tasks,
-                        kwargs=self._process_kwargs,
-                    )
-                    for _ in range(max_workers - self._max_workers)
-                ]
+            elif self._max_workers < max_workers:
+                start_id = len(self._process)
+                new_process_lst = [
+                    Thread(
+                        target=execute_multiple_tasks,
+                        kwargs=self._process_kwargs | {"worker_id": start_id + idx},
+                        name=f"BlockAllocWorker-{start_id + idx}",
+                    )
+                    for idx in range(max_workers - self._max_workers)
+                ]
executorlib/task_scheduler/interactive/onetoone.py (1)

200-209: Don’t override user-provided init_function

Setting init_function=None here discards any value supplied via executor_kwargs, which is likely unintended and a behavior change.

Apply:

     task_kwargs.update(
         {
-            "task_dict": task_dict,
-            "spawner": spawner,
-            "hostname_localhost": hostname_localhost,
-            "init_function": None,
+            "task_dict": task_dict,
+            "spawner": spawner,
+            "hostname_localhost": hostname_localhost,
         }
     )
🧹 Nitpick comments (4)
executorlib/task_scheduler/interactive/shared.py (2)

105-128: Fix docstring: wrong entity and incorrect Args for execute_single_task.

Use “single task” and document task_dict; drop queue_join_on_shutdown which isn’t an argument.

-    """
-    Execute a single tasks in parallel using the message passing interface (MPI).
+    """
+    Execute a single task in parallel using the message passing interface (MPI).
@@
-       future_queue (queue.Queue): task queue of dictionary objects which are submitted to the parallel process
+       task_dict (dict): Dictionary describing the task submitted to the parallel process
+                         {"fn": Callable, "args": (), "kwargs": {}, "future": Future, "resource_dict": {}}
@@
-       queue_join_on_shutdown (bool): Join communication queue when thread is closed. Defaults to True.

31-54: Fix docstring: “single tasks” → “multiple tasks” and clarify caching default.

Minor user-facing text accuracy.

-    Execute a single tasks in parallel using the message passing interface (MPI).
+    Execute multiple tasks in parallel using the message passing interface (MPI).
@@
-       cache_directory (str, optional): The directory to store cache files. Defaults to "executorlib_cache".
+       cache_directory (str, optional): Directory to store cache files. If None, caching is disabled.
executorlib/task_scheduler/interactive/blockallocation.py (1)

66-69: Name worker threads for easier debugging

Optional, but naming threads makes tracing/logging simpler.

Apply:

 Thread(
-    target=execute_multiple_tasks,
-    kwargs=executor_kwargs | {"worker_id": worker_id},
+    target=execute_multiple_tasks,
+    kwargs=executor_kwargs | {"worker_id": worker_id},
+    name=f"BlockAllocWorker-{worker_id}",
 )
executorlib/task_scheduler/interactive/onetoone.py (1)

210-213: Name the per-task thread

A thread name helps trace single-task executions.

Apply:

-    process = Thread(
-        target=execute_single_task,
-        kwargs=task_kwargs,
-    )
+    process = Thread(
+        target=execute_single_task,
+        kwargs=task_kwargs,
+        name=f"OneToOneTask-{id(task_dict.get('future'))}",
+    )
📜 Review details

Configuration used: CodeRabbit UI

Review profile: CHILL

Plan: Pro

💡 Knowledge Base configuration:

  • MCP integration is disabled by default for public repositories
  • Jira integration is disabled by default for public repositories
  • Linear integration is disabled by default for public repositories

You can enable these sources in your CodeRabbit configuration.

📥 Commits

Reviewing files that changed from the base of the PR and between fd826ae and 9b2e295.

📒 Files selected for processing (7)
  • executorlib/task_scheduler/interactive/blockallocation.py (3 hunks)
  • executorlib/task_scheduler/interactive/onetoone.py (2 hunks)
  • executorlib/task_scheduler/interactive/shared.py (2 hunks)
  • tests/test_fluxpythonspawner.py (3 hunks)
  • tests/test_mpiexecspawner.py (5 hunks)
  • tests/test_singlenodeexecutor_shell_executor.py (4 hunks)
  • tests/test_singlenodeexecutor_shell_interactive.py (2 hunks)
🧰 Additional context used
🧬 Code graph analysis (7)
tests/test_singlenodeexecutor_shell_executor.py (1)
executorlib/task_scheduler/interactive/shared.py (1)
  • execute_multiple_tasks (16-88)
tests/test_fluxpythonspawner.py (1)
executorlib/task_scheduler/interactive/shared.py (1)
  • execute_multiple_tasks (16-88)
tests/test_singlenodeexecutor_shell_interactive.py (1)
executorlib/task_scheduler/interactive/shared.py (1)
  • execute_multiple_tasks (16-88)
executorlib/task_scheduler/interactive/shared.py (3)
executorlib/standalone/interactive/spawner.py (2)
  • BaseSpawner (9-59)
  • MpiExecSpawner (141-158)
executorlib/standalone/interactive/communication.py (2)
  • interface_bootup (156-204)
  • send_dict (46-57)
executorlib/standalone/command.py (1)
  • get_interactive_execute_command (92-114)
tests/test_mpiexecspawner.py (1)
executorlib/task_scheduler/interactive/shared.py (1)
  • execute_multiple_tasks (16-88)
executorlib/task_scheduler/interactive/blockallocation.py (1)
executorlib/task_scheduler/interactive/shared.py (1)
  • execute_multiple_tasks (16-88)
executorlib/task_scheduler/interactive/onetoone.py (1)
executorlib/task_scheduler/interactive/shared.py (1)
  • execute_single_task (91-152)
🔇 Additional comments (8)
executorlib/task_scheduler/interactive/shared.py (1)

68-89: LGTM: loop, sentinel handling, and per-task completion are clear.

The multi-task loop correctly handles shutdown, optional queue join, and per-task execution/caching.

tests/test_singlenodeexecutor_shell_executor.py (1)

35-40: LGTM: tests migrated to execute_multiple_tasks.

Rename is consistent; behavior unchanged.

Also applies to: 61-66, 89-93

tests/test_singlenodeexecutor_shell_interactive.py (1)

91-97: LGTM: execute_multiple_tasks with spawner and init_function.

Covers the new API surface well.

tests/test_fluxpythonspawner.py (1)

115-121: LGTM: Flux tests updated to execute_multiple_tasks.

Signature and kwargs remain correct.

Also applies to: 130-136

tests/test_mpiexecspawner.py (2)

13-14: LGTM: import updated to execute_multiple_tasks.


479-483: LGTM: renamed calls already correct in other tests here.

Also applies to: 496-501, 519-526

executorlib/task_scheduler/interactive/blockallocation.py (1)

13-13: Switch to execute_multiple_tasks: LGTM

Import aligns with the new API. No issues spotted.

executorlib/task_scheduler/interactive/onetoone.py (1)

7-7: Switch to execute_single_task: LGTM

Import matches the refactor intent.

Comment on lines +142 to +153
if error_log_file is not None:
task_dict["error_log_file"] = error_log_file
if cache_directory is None:
_execute_task_without_cache(interface=interface, task_dict=task_dict)
else:
_execute_task_with_cache(
interface=interface,
task_dict=task_dict,
cache_directory=cache_directory,
cache_key=cache_key,
)

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue

execute_single_task leaks the interface; always shutdown in success path.

If no exception occurs, the interface process remains running. Ensure shutdown in a finally block.

-    if error_log_file is not None:
-        task_dict["error_log_file"] = error_log_file
-    if cache_directory is None:
-        _execute_task_without_cache(interface=interface, task_dict=task_dict)
-    else:
-        _execute_task_with_cache(
-            interface=interface,
-            task_dict=task_dict,
-            cache_directory=cache_directory,
-            cache_key=cache_key,
-        )
+    try:
+        if error_log_file is not None:
+            task_dict["error_log_file"] = error_log_file
+        if cache_directory is None:
+            _execute_task_without_cache(interface=interface, task_dict=task_dict)
+        else:
+            _execute_task_with_cache(
+                interface=interface,
+                task_dict=task_dict,
+                cache_directory=cache_directory,
+                cache_key=cache_key,
+            )
+    finally:
+        # Be robust if helpers already shut down on exception
+        with contextlib.suppress(Exception):
+            interface.shutdown(wait=True)
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
if error_log_file is not None:
task_dict["error_log_file"] = error_log_file
if cache_directory is None:
_execute_task_without_cache(interface=interface, task_dict=task_dict)
else:
_execute_task_with_cache(
interface=interface,
task_dict=task_dict,
cache_directory=cache_directory,
cache_key=cache_key,
)
try:
if error_log_file is not None:
task_dict["error_log_file"] = error_log_file
if cache_directory is None:
_execute_task_without_cache(interface=interface, task_dict=task_dict)
else:
_execute_task_with_cache(
interface=interface,
task_dict=task_dict,
cache_directory=cache_directory,
cache_key=cache_key,
)
finally:
# Be robust if helpers already shut down on exception
with contextlib.suppress(Exception):
interface.shutdown(wait=True)
🤖 Prompt for AI Agents
In executorlib/task_scheduler/interactive/shared.py around lines 142 to 153, the
code calls either _execute_task_without_cache or _execute_task_with_cache but
does not stop the interface on the success path, leaking the interface process;
wrap the execution calls in a try/finally so that interface.shutdown() (or the
appropriate stop/close method on the interface) is always invoked after the task
completes or raises, preserving the existing error_log_file and cache logic and
ensuring shutdown occurs in both success and exception cases.

@codecov
Copy link

codecov bot commented Aug 31, 2025

Codecov Report

✅ All modified and coverable lines are covered by tests.
✅ Project coverage is 97.75%. Comparing base (fd826ae) to head (b05b2b5).
⚠️ Report is 1 commits behind head on main.

Additional details and impacted files
@@            Coverage Diff             @@
##             main     #796      +/-   ##
==========================================
- Coverage   97.75%   97.75%   -0.01%     
==========================================
  Files          33       33              
  Lines        1468     1467       -1     
==========================================
- Hits         1435     1434       -1     
  Misses         33       33              

☔ View full report in Codecov by Sentry.
📢 Have feedback on the report? Share it here.

🚀 New features to boost your workflow:
  • ❄️ Test Analytics: Detect flaky tests, report on failures, and find test suite problems.

Copy link
Contributor

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 0

🧹 Nitpick comments (2)
tests/test_mpiexecspawner.py (2)

264-270: Avoid double-joining the queue.

execute_multiple_tasks already joins the queue by default (queue_join_on_shutdown=True). Either disable that here or drop the explicit q.join() later to shave test time. Same nit applies to the other updated calls in this file.

Option A (disable internal join):

 execute_multiple_tasks(
     future_queue=q,
     cores=1,
     openmpi_oversubscribe=False,
     spawner=MpiExecSpawner,
     init_function=set_global,
+    queue_join_on_shutdown=False,
 )

Option B (keep default, remove explicit join later):

-        q.join()
+        # q.join() not needed; execute_multiple_tasks already joined the queue.

478-483: Consider adding a targeted test for execute_single_task.

Since the PR introduces execute_single_task, add a minimal happy-path test to pin its behavior (e.g., same calc_array(i=2) flow) to guard future regressions.

I can draft a small test method mirroring this case using execute_single_task if you want.

📜 Review details

Configuration used: CodeRabbit UI

Review profile: CHILL

Plan: Pro

💡 Knowledge Base configuration:

  • MCP integration is disabled by default for public repositories
  • Jira integration is disabled by default for public repositories
  • Linear integration is disabled by default for public repositories

You can enable these sources in your CodeRabbit configuration.

📥 Commits

Reviewing files that changed from the base of the PR and between 9b2e295 and b67fb12.

📒 Files selected for processing (1)
  • tests/test_mpiexecspawner.py (8 hunks)
🧰 Additional context used
🧬 Code graph analysis (1)
tests/test_mpiexecspawner.py (1)
executorlib/task_scheduler/interactive/shared.py (1)
  • execute_multiple_tasks (16-88)
⏰ Context from checks skipped due to timeout of 90000ms. You can increase the timeout in your CodeRabbit configuration to a maximum of 15 minutes (900000ms). (13)
  • GitHub Check: unittest_openmpi (ubuntu-latest, 3.11)
  • GitHub Check: unittest_openmpi (ubuntu-latest, 3.12)
  • GitHub Check: unittest_openmpi (ubuntu-22.04-arm, 3.13)
  • GitHub Check: unittest_openmpi (ubuntu-latest, 3.13)
  • GitHub Check: unittest_openmpi (ubuntu-24.04-arm, 3.13)
  • GitHub Check: unittest_openmpi (macos-latest, 3.13)
  • GitHub Check: unittest_slurm_mpich
  • GitHub Check: notebooks_integration
  • GitHub Check: unittest_mpich (ubuntu-24.04-arm, 3.13)
  • GitHub Check: unittest_flux_mpich
  • GitHub Check: benchmark (ubuntu-latest, 3.13, .ci_support/environment-openmpi.yml)
  • GitHub Check: benchmark (ubuntu-latest, 3.13, .ci_support/environment-mpich.yml)
  • GitHub Check: unittest_win
🔇 Additional comments (6)
tests/test_mpiexecspawner.py (6)

446-451: Failure-path test updated to new API is correct.

Covers missing required arg, exception propagation via Future remains intact.


462-467: Wrong-argument failure-path test looks good.

Validates TypeError surfacing through Future with the renamed runner.


519-525: Cache-mode success-path updated correctly.

Matches new API and uses tearDown to clean the cache dir.


538-544: Cache-mode failure-path test looks good.

Exercises error propagation with caching enabled against the renamed entry point.


13-13: Approve changes — no lingering execute_tasks references found.


496-501: No action needed—openmpi_oversubscribe is correctly forwarded and applied MpiExecSpawner’s command builder (generate_mpiexec_command) still checks openmpi_oversubscribe and adds --oversubscribe when true.

@jan-janssen jan-janssen merged commit 2110b7e into main Aug 31, 2025
62 of 63 checks passed
@jan-janssen jan-janssen deleted the execute_single_task branch August 31, 2025 10:13
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants