Skip to content

Conversation

@jan-janssen
Copy link
Member

@jan-janssen jan-janssen commented Aug 31, 2025

Summary by CodeRabbit

  • Refactor
    • Streamlined task execution flow with centralized completion handling; internal helpers simplified while public API remains unchanged.
  • Documentation
    • Updated docstrings to reflect removed per-task queue parameter and new helper signatures.
  • Chores
    • Improved error propagation so task failures surface via task results; caching behavior preserved for cached executions.
  • Tests
    • Updated tests to exercise cached execution, assert errors through task result retrieval, and ensure orderly shutdown.

@coderabbitai
Copy link
Contributor

coderabbitai bot commented Aug 31, 2025

Walkthrough

Removed per-task future_queue from internal helpers; helpers now set task Futures and return. execute_tasks centralized completion signaling by calling _task_done(future_queue=...) after helper returns. Caching behavior and public execute_tasks signature unchanged.

Changes

Cohort / File(s) Change Summary
Task execution flow & caching
executorlib/task_scheduler/interactive/shared.py
Removed per-task future_queue parameter from _execute_task_without_cache and _execute_task_with_cache. Helpers now accept simplified signatures, populate Futures with results or exceptions, and no longer call _task_done; execute_tasks now calls _task_done(future_queue=...) after helper returns. Cache read/write and result population preserved; docstrings updated. Public API unchanged.
Tests: spawn/cache behavior
tests/test_mpiexecspawner.py
Updated tests (test_execute_task_failed_no_argument, test_execute_task_failed_wrong_argument, test_execute_task_cache_failed_no_argument) to enqueue shutdown before execution, call execute_tasks early (with cache_directory where applicable), and assert TypeError by calling f.result() inside with self.assertRaises(TypeError). Maintains q.join() for termination.

Sequence Diagram(s)

sequenceDiagram
  autonumber
  participant Caller as Caller
  participant Exec as execute_tasks
  participant NoCache as _execute_task_without_cache
  participant Future as Future
  Note over Exec,NoCache: no-cache execution
  Caller->>Exec: submit task
  Exec->>NoCache: run(interface, task_dict)
  NoCache-->>Future: set_result / set_exception
  NoCache-->>Exec: return
  Exec->>Exec: _task_done(future_queue)
  Exec-->>Caller: futures
Loading
sequenceDiagram
  autonumber
  participant Caller as Caller
  participant Exec as execute_tasks
  participant CacheH as _execute_task_with_cache
  participant Disk as FileCache
  participant Future as Future
  Note over Exec,CacheH: cached execution
  Caller->>Exec: submit task (cache_key)
  Exec->>CacheH: run(interface, task_dict, cache_directory, cache_key)
  alt cache hit
    CacheH->>Disk: read(cache_file)
    Disk-->>CacheH: data
    CacheH-->>Future: set_result
  else cache miss / compute
    CacheH->>Disk: write(cache_file)
    CacheH-->>Future: set_result / set_exception
  end
  CacheH-->>Exec: return
  Exec->>Exec: _task_done(future_queue)
  Exec-->>Caller: futures
Loading

Estimated code review effort

🎯 3 (Moderate) | ⏱️ ~20 minutes

Possibly related PRs

  • remove unused future object #497 — Modifies the same helpers (_execute_task_with_cache / _execute_task_without_cache) and future/queue handling; likely closely related.
  • Write cache first #492 — Also changes task execution and caching helpers and how exceptions/futures are handled; relevant to behavior adjustments.

Suggested reviewers

  • liamhuber

Poem

I hopped through queues and cleaned the trail,
Helpers now hand off, no extra tale.
Cache keeps carrots snug and bright,
Futures sing results in morning light.
Thump—one done, the warren’s right. 🥕🐇

✨ Finishing Touches
  • 📝 Generate Docstrings
🧪 Generate unit tests
  • Create PR with unit tests
  • Post copyable unit tests in a comment
  • Commit unit tests in branch task_done

Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out.

❤️ Share
🪧 Tips

Chat

There are 3 ways to chat with CodeRabbit:

  • Review comments: Directly reply to a review comment made by CodeRabbit. Example:
    • I pushed a fix in commit <commit_id>, please review it.
    • Open a follow-up GitHub issue for this discussion.
  • Files and specific lines of code (under the "Files changed" tab): Tag @coderabbitai in a new review comment at the desired location with your query.
  • PR comments: Tag @coderabbitai in a new PR comment to ask questions about the PR branch. For the best results, please provide a very specific query, as very limited context is provided in this mode. Examples:
    • @coderabbitai gather interesting stats about this repository and render them as a table. Additionally, render a pie chart showing the language distribution in the codebase.
    • @coderabbitai read the files in the src/scheduler package and generate a class diagram using mermaid and a README in the markdown format.

Support

Need help? Create a ticket on our support page for assistance with any issues or questions.

CodeRabbit Commands (Invoked using PR/Issue comments)

Type @coderabbitai help to get the list of available commands.

Other keywords and placeholders

  • Add @coderabbitai ignore or @coderabbit ignore anywhere in the PR description to prevent this PR from being reviewed.
  • Add @coderabbitai summary to generate the high-level summary at a specific location in the PR description.
  • Add @coderabbitai anywhere in the PR title to generate the title automatically.

CodeRabbit Configuration File (.coderabbit.yaml)

  • You can programmatically configure CodeRabbit by adding a .coderabbit.yaml file to the root of your repository.
  • Please see the configuration documentation for more information.
  • If your editor has YAML language server enabled, you can add the path at the top of this file to enable auto-completion and validation: # yaml-language-server: $schema=https://coderabbit.ai/integrations/schema.v2.json

Status, Documentation and Community

  • Visit our Status Page to check the current availability of CodeRabbit.
  • Visit our Documentation for detailed information on how to use CodeRabbit.
  • Join our Discord Community to get help, request features, and share feedback.
  • Follow us on X/Twitter for updates and announcements.

Copy link
Contributor

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 1

Caution

Some comments are outside the diff and can’t be posted inline due to platform limitations.

⚠️ Outside diff range comments (2)
executorlib/task_scheduler/interactive/shared.py (2)

76-89: Always call task_done; prevent deadlock and stop after fatal task failures

If _execute_task_with_cache raises (it does re-raise), _task_done is skipped, potentially hanging future_queue.join(). Also, after helpers shut down the interface on error, the loop continues and will use a dead interface. Wrap dispatch in try/finally and break on error.

-        elif "fn" in task_dict and "future" in task_dict:
-            if error_log_file is not None:
-                task_dict["error_log_file"] = error_log_file
-            if cache_directory is None:
-                _execute_task_without_cache(interface=interface, task_dict=task_dict)
-            else:
-                _execute_task_with_cache(
-                    interface=interface,
-                    task_dict=task_dict,
-                    cache_directory=cache_directory,
-                    cache_key=cache_key,
-                )
-            _task_done(future_queue=future_queue)
+        elif "fn" in task_dict and "future" in task_dict:
+            if error_log_file is not None:
+                task_dict["error_log_file"] = error_log_file
+            try:
+                if cache_directory is None:
+                    _execute_task_without_cache(interface=interface, task_dict=task_dict)
+                else:
+                    _execute_task_with_cache(
+                        interface=interface,
+                        task_dict=task_dict,
+                        cache_directory=cache_directory,
+                        cache_key=cache_key,
+                    )
+            except Exception:
+                # Helpers set the Future exception and shut down the interface; stop processing.
+                break
+            finally:
+                _task_done(future_queue=future_queue)

151-153: Guard Future state on cache hit to avoid InvalidStateError

On cache hits you call future.set_result without checking cancellation/done state. This can raise and bypass _task_done.

-        _, _, result = get_output(file_name=file_name)
-        future = task_dict["future"]
-        future.set_result(result)
+        _, _, result = get_output(file_name=file_name)
+        future = task_dict.get("future")
+        if future is not None and (not future.done()) and future.set_running_or_notify_cancel():
+            future.set_result(result)
🧹 Nitpick comments (1)
executorlib/task_scheduler/interactive/shared.py (1)

31-31: Docstring grammar

Tweak phrasing.

-    Execute a single tasks in parallel using the message passing interface (MPI).
+    Execute tasks in parallel using the message passing interface (MPI).
📜 Review details

Configuration used: CodeRabbit UI

Review profile: CHILL

Plan: Pro

💡 Knowledge Base configuration:

  • MCP integration is disabled by default for public repositories
  • Jira integration is disabled by default for public repositories
  • Linear integration is disabled by default for public repositories

You can enable these sources in your CodeRabbit configuration.

📥 Commits

Reviewing files that changed from the base of the PR and between 5530fb5 and b9806f7.

📒 Files selected for processing (1)
  • executorlib/task_scheduler/interactive/shared.py (1 hunks)
🧰 Additional context used
🧬 Code graph analysis (1)
executorlib/task_scheduler/interactive/shared.py (3)
executorlib/executor/base.py (1)
  • future_queue (44-51)
executorlib/task_scheduler/base.py (1)
  • future_queue (63-70)
executorlib/standalone/interactive/communication.py (1)
  • SocketInterface (14-153)
⏰ Context from checks skipped due to timeout of 90000ms. You can increase the timeout in your CodeRabbit configuration to a maximum of 15 minutes (900000ms). (20)
  • GitHub Check: unittest_mpich (ubuntu-latest, 3.13)
  • GitHub Check: unittest_openmpi (ubuntu-22.04-arm, 3.13)
  • GitHub Check: unittest_old
  • GitHub Check: unittest_mpich (ubuntu-latest, 3.11)
  • GitHub Check: unittest_openmpi (ubuntu-latest, 3.12)
  • GitHub Check: unittest_openmpi (ubuntu-latest, 3.11)
  • GitHub Check: unittest_openmpi (ubuntu-24.04-arm, 3.13)
  • GitHub Check: unittest_openmpi (macos-latest, 3.13)
  • GitHub Check: unittest_openmpi (ubuntu-latest, 3.13)
  • GitHub Check: unittest_mpich (ubuntu-24.04-arm, 3.13)
  • GitHub Check: unittest_mpich (ubuntu-latest, 3.12)
  • GitHub Check: unittest_mpich (ubuntu-22.04-arm, 3.13)
  • GitHub Check: unittest_mpich (macos-latest, 3.13)
  • GitHub Check: unittest_win
  • GitHub Check: notebooks
  • GitHub Check: unittest_flux_openmpi
  • GitHub Check: notebooks_integration
  • GitHub Check: unittest_flux_mpich
  • GitHub Check: benchmark (ubuntu-latest, 3.13, .ci_support/environment-openmpi.yml)
  • GitHub Check: benchmark (ubuntu-latest, 3.13, .ci_support/environment-mpich.yml)

Comment on lines 100 to 107
f = task_dict.pop("future")
if not f.done() and f.set_running_or_notify_cancel():
try:
f.set_result(interface.send_and_receive_dict(input_dict=task_dict))
except Exception as thread_exception:
interface.shutdown(wait=True)
_task_done(future_queue=future_queue)
f.set_exception(exception=thread_exception)
else:
_task_done(future_queue=future_queue)

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue

Unify exception propagation; re-raise after setting Future exception

Without re-raising here, the outer loop keeps running with a shut-down interface, causing follow-up failures. Make this consistent with the cache path.

     if not f.done() and f.set_running_or_notify_cancel():
         try:
             f.set_result(interface.send_and_receive_dict(input_dict=task_dict))
         except Exception as thread_exception:
             interface.shutdown(wait=True)
             f.set_exception(exception=thread_exception)
+            raise thread_exception
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
f = task_dict.pop("future")
if not f.done() and f.set_running_or_notify_cancel():
try:
f.set_result(interface.send_and_receive_dict(input_dict=task_dict))
except Exception as thread_exception:
interface.shutdown(wait=True)
_task_done(future_queue=future_queue)
f.set_exception(exception=thread_exception)
else:
_task_done(future_queue=future_queue)
f = task_dict.pop("future")
if not f.done() and f.set_running_or_notify_cancel():
try:
f.set_result(interface.send_and_receive_dict(input_dict=task_dict))
except Exception as thread_exception:
interface.shutdown(wait=True)
f.set_exception(exception=thread_exception)
raise thread_exception
🤖 Prompt for AI Agents
In executorlib/task_scheduler/interactive/shared.py around lines 100 to 107, the
except block sets the Future exception and shuts down the interface but does not
re-raise, which leaves the outer loop running against a shut-down interface;
update the except block to call interface.shutdown(wait=True), set the future
exception as done, and then re-raise the caught thread_exception (raise) so the
surrounding caller observes the failure—matching the cache path behavior.

@codecov
Copy link

codecov bot commented Aug 31, 2025

Codecov Report

✅ All modified and coverable lines are covered by tests.
✅ Project coverage is 97.75%. Comparing base (5530fb5) to head (750b7da).
⚠️ Report is 1 commits behind head on main.

Additional details and impacted files
@@            Coverage Diff             @@
##             main     #795      +/-   ##
==========================================
- Coverage   97.75%   97.75%   -0.01%     
==========================================
  Files          33       33              
  Lines        1473     1468       -5     
==========================================
- Hits         1440     1435       -5     
  Misses         33       33              

☔ View full report in Codecov by Sentry.
📢 Have feedback on the report? Share it here.

🚀 New features to boost your workflow:
  • ❄️ Test Analytics: Detect flaky tests, report on failures, and find test suite problems.

Copy link
Contributor

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 0

Caution

Some comments are outside the diff and can’t be posted inline due to platform limitations.

⚠️ Outside diff range comments (1)
executorlib/task_scheduler/interactive/shared.py (1)

150-153: Honor cancellation on cache hit to avoid InvalidStateError

On cache hits you call set_result without transitioning the Future to running. If the Future was cancelled, set_result raises InvalidStateError.

-        _, _, result = get_output(file_name=file_name)
-        future = task_dict["future"]
-        future.set_result(result)
+        _, _, result = get_output(file_name=file_name)
+        f = task_dict["future"]
+        if f.set_running_or_notify_cancel():
+            f.set_result(result)
♻️ Duplicate comments (1)
executorlib/task_scheduler/interactive/shared.py (1)

91-107: Exception is swallowed after shutting down the interface; verify contract or short‑circuit subsequent tasks

Helpers set the Future exception and call shutdown(wait=True) but do not re‑raise, so the loop keeps dequeuing and will try to use a shut-down interface until it sees a shutdown sentinel. If downstream always enqueues a shutdown item immediately after a failing task, this is fine—otherwise you’ll churn through remaining tasks with spurious failures.

  • If the design is “errors surface via Future, loop continues,” consider guarding later sends when the interface is down (e.g., detect the closed state and directly set exceptions on subsequent Futures until shutdown is received).
  • Alternatively, re‑raise here and keep the above try/finally in execute_tasks to ensure task_done; tests would need to expect the exception at f.result() or at execute_tasks consistently.

Would you like a small follow-up patch that short-circuits further task execution once the interface has been shut down (while still draining the queue to the shutdown sentinel)?

🧹 Nitpick comments (3)
executorlib/task_scheduler/interactive/shared.py (2)

79-89: Centralizing task_done is good; wrap helper in try/finally so the queue never hangs if a helper unexpectedly raises

This guarantees task_done is called even on unforeseen errors (e.g., InvalidStateError from Future state transitions).

-            if cache_directory is None:
-                _execute_task_without_cache(interface=interface, task_dict=task_dict)
-            else:
-                _execute_task_with_cache(
-                    interface=interface,
-                    task_dict=task_dict,
-                    cache_directory=cache_directory,
-                    cache_key=cache_key,
-                )
-            _task_done(future_queue=future_queue)
+            try:
+                if cache_directory is None:
+                    _execute_task_without_cache(interface=interface, task_dict=task_dict)
+                else:
+                    _execute_task_with_cache(
+                        interface=interface,
+                        task_dict=task_dict,
+                        cache_directory=cache_directory,
+                        cache_key=cache_key,
+                    )
+            finally:
+                _task_done(future_queue=future_queue)

31-31: Docstring grammar nit

“Execute a single tasks” → “Execute a single task” or “Execute tasks.”

-    Execute a single tasks in parallel using the message passing interface (MPI).
+    Execute a single task in parallel using the message passing interface (MPI).
tests/test_mpiexecspawner.py (1)

536-547: Minor test consistency suggestion

For readability, consider aligning the non-cached failure test to the same pattern (execute_tasks runs; assert at f.result()).

📜 Review details

Configuration used: CodeRabbit UI

Review profile: CHILL

Plan: Pro

💡 Knowledge Base configuration:

  • MCP integration is disabled by default for public repositories
  • Jira integration is disabled by default for public repositories
  • Linear integration is disabled by default for public repositories

You can enable these sources in your CodeRabbit configuration.

📥 Commits

Reviewing files that changed from the base of the PR and between b9806f7 and eb0973f.

📒 Files selected for processing (2)
  • executorlib/task_scheduler/interactive/shared.py (1 hunks)
  • tests/test_mpiexecspawner.py (1 hunks)
🧰 Additional context used
🧬 Code graph analysis (2)
tests/test_mpiexecspawner.py (2)
executorlib/standalone/serialize.py (1)
  • cloudpickle_register (9-28)
executorlib/task_scheduler/interactive/shared.py (1)
  • execute_tasks (16-88)
executorlib/task_scheduler/interactive/shared.py (3)
executorlib/task_scheduler/base.py (1)
  • future_queue (63-70)
executorlib/executor/base.py (1)
  • future_queue (44-51)
executorlib/standalone/interactive/communication.py (1)
  • SocketInterface (14-153)
⏰ Context from checks skipped due to timeout of 90000ms. You can increase the timeout in your CodeRabbit configuration to a maximum of 15 minutes (900000ms). (18)
  • GitHub Check: unittest_openmpi (ubuntu-latest, 3.11)
  • GitHub Check: unittest_flux_mpich
  • GitHub Check: unittest_flux_openmpi
  • GitHub Check: unittest_openmpi (ubuntu-22.04-arm, 3.13)
  • GitHub Check: notebooks
  • GitHub Check: benchmark (ubuntu-latest, 3.13, .ci_support/environment-openmpi.yml)
  • GitHub Check: benchmark (ubuntu-latest, 3.13, .ci_support/environment-mpich.yml)
  • GitHub Check: unittest_openmpi (ubuntu-latest, 3.12)
  • GitHub Check: unittest_openmpi (macos-latest, 3.13)
  • GitHub Check: unittest_openmpi (ubuntu-24.04-arm, 3.13)
  • GitHub Check: unittest_openmpi (ubuntu-latest, 3.13)
  • GitHub Check: unittest_mpich (ubuntu-latest, 3.12)
  • GitHub Check: unittest_mpich (ubuntu-latest, 3.11)
  • GitHub Check: unittest_mpich (ubuntu-latest, 3.13)
  • GitHub Check: unittest_old
  • GitHub Check: notebooks_integration
  • GitHub Check: unittest_win
  • GitHub Check: unittest_slurm_mpich
🔇 Additional comments (1)
tests/test_mpiexecspawner.py (1)

538-546: Good: assert via Future on cached error path

Moving the assertion to f.result() matches the new behavior where execute_tasks doesn’t raise and continues to process until shutdown.

Copy link
Contributor

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 0

🧹 Nitpick comments (3)
tests/test_mpiexecspawner.py (3)

446-451: Good change: assert on Future exception after execution

Calling execute_tasks(...) first and then asserting the TypeError via f.result() matches the new behavior where helpers set exceptions on the Future. Looks correct.

To avoid a redundant internal Queue.join() followed by the explicit q.join() in the test, either remove the explicit q.join() or pass queue_join_on_shutdown=False here. Suggested inline tweak:

         execute_tasks(
             future_queue=q,
             cores=1,
             openmpi_oversubscribe=False,
             spawner=MpiExecSpawner,
+            queue_join_on_shutdown=False,
         )

462-467: Same adjustment here is correct

The refactor to check the TypeError from f.result() is consistent with the internal change.

Apply the same optional join tweak to keep the test fully in control of queue joining:

         execute_tasks(
             future_queue=q,
             cores=1,
             openmpi_oversubscribe=False,
             spawner=MpiExecSpawner,
+            queue_join_on_shutdown=False,
         )

536-547: Cache-path failure test aligns with new semantics

Executing first and asserting the TypeError via the Future is appropriate for the cached path too.

Consider the same join behavior tweak here:

         execute_tasks(
             future_queue=q,
             cores=1,
             openmpi_oversubscribe=False,
             spawner=MpiExecSpawner,
             cache_directory="executorlib_cache",
+            queue_join_on_shutdown=False,
         )

Optionally, add a quick post-assert check that the Future is marked done (and/or f.exception() is a TypeError) to tighten the signal.

📜 Review details

Configuration used: CodeRabbit UI

Review profile: CHILL

Plan: Pro

💡 Knowledge Base configuration:

  • MCP integration is disabled by default for public repositories
  • Jira integration is disabled by default for public repositories
  • Linear integration is disabled by default for public repositories

You can enable these sources in your CodeRabbit configuration.

📥 Commits

Reviewing files that changed from the base of the PR and between eb0973f and 750b7da.

📒 Files selected for processing (1)
  • tests/test_mpiexecspawner.py (3 hunks)
🧰 Additional context used
🧬 Code graph analysis (1)
tests/test_mpiexecspawner.py (3)
executorlib/task_scheduler/interactive/shared.py (1)
  • execute_tasks (16-88)
executorlib/standalone/interactive/spawner.py (1)
  • MpiExecSpawner (141-158)
executorlib/standalone/serialize.py (1)
  • cloudpickle_register (9-28)
⏰ Context from checks skipped due to timeout of 90000ms. You can increase the timeout in your CodeRabbit configuration to a maximum of 15 minutes (900000ms). (18)
  • GitHub Check: unittest_mpich (macos-latest, 3.13)
  • GitHub Check: unittest_mpich (ubuntu-latest, 3.13)
  • GitHub Check: unittest_mpich (ubuntu-24.04-arm, 3.13)
  • GitHub Check: unittest_flux_openmpi
  • GitHub Check: pip_check
  • GitHub Check: notebooks
  • GitHub Check: unittest_slurm_mpich
  • GitHub Check: unittest_old
  • GitHub Check: notebooks_integration
  • GitHub Check: unittest_flux_mpich
  • GitHub Check: unittest_openmpi (ubuntu-22.04-arm, 3.13)
  • GitHub Check: unittest_openmpi (ubuntu-latest, 3.11)
  • GitHub Check: unittest_openmpi (ubuntu-24.04-arm, 3.13)
  • GitHub Check: unittest_openmpi (ubuntu-latest, 3.13)
  • GitHub Check: unittest_openmpi (macos-latest, 3.13)
  • GitHub Check: benchmark (ubuntu-latest, 3.13, .ci_support/environment-openmpi.yml)
  • GitHub Check: benchmark (ubuntu-latest, 3.13, .ci_support/environment-mpich.yml)
  • GitHub Check: unittest_win

@jan-janssen jan-janssen merged commit fd826ae into main Aug 31, 2025
62 of 63 checks passed
@jan-janssen jan-janssen deleted the task_done branch August 31, 2025 07:51
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants