-
Notifications
You must be signed in to change notification settings - Fork 4
Implement splitting of future objects #819
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
|
Caution Review failedThe pull request is closed. WalkthroughAdds a new standalone selector utility providing FutureSelector, split_future, and get_item_from_future; exposes split_future and get_item_from_future via executorlib/init.py; introduces tests validating selection, splitting, and error propagation with SingleNodeExecutor and plain Future usage. Changes
Sequence Diagram(s)sequenceDiagram
autonumber
actor U as User Code
participant E as SingleNodeExecutor
participant F as Future
participant FS as FutureSelector
U->>E: submit(func)
E-->>U: Future (F)
U->>U: split_future(F, n) / get_item_from_future(F, key)
U-->>U: FutureSelector(s) (FS)
note over FS,F: FS delegates cancel/done/exception/etc. to F
F-->>E: completes with result (tuple/dict) or error
alt success
FS-->>U: result() => result[selector]
else failure
FS-->>U: result() raises underlying exception
end
Estimated code review effort🎯 3 (Moderate) | ⏱️ ~25 minutes Possibly related PRs
Poem
Tip 👮 Agentic pre-merge checks are now available in preview!Pro plan users can now enable pre-merge checks in their settings to enforce checklists before merging PRs.
Please see the documentation for more information. Example: reviews:
pre_merge_checks:
custom_checks:
- name: "Undocumented Breaking Changes"
mode: "warning"
instructions: |
Pass/fail criteria: All breaking changes to public APIs, CLI flags, environment variables, configuration keys, database schemas, or HTTP/GraphQL endpoints must be documented in the "Breaking Change" section of the PR description and in CHANGELOG.md. Exclude purely internal or private changes (e.g., code not exported from package entry points or explicitly marked as internal).Please share your feedback with us on this Discord post. 📜 Recent review detailsConfiguration used: CodeRabbit UI Review profile: CHILL Plan: Pro 📒 Files selected for processing (3)
✨ Finishing touches
🧪 Generate unit tests
Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out. Comment |
for more information, see https://pre-commit.ci
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 2
🧹 Nitpick comments (6)
executorlib/standalone/split.py (4)
26-27: Harden result() against non-sequence results and out-of-range selectors.Improve error message and avoid obscure TypeError/IndexError surfaces from indexing.
- def result(self, timeout: Optional[float] = None) -> Any: - return self._future.result(timeout=timeout)[self._selector] + def result(self, timeout: Optional[float] = None) -> Any: + res = self._future.result(timeout=timeout) + try: + return res[self._selector] + except (TypeError, IndexError) as e: + raise IndexError( + f"SplitFuture selector {self._selector} is invalid for result of type {type(res).__name__}" + ) from e
35-39: Discourage mutation APIs on the wrapper.Forwarding set_result/set_exception exposes a sharp edge: accidentally setting a per-split value corrupts the underlying future’s result shape for all splits. Prefer blocking these on SplitFuture.
- def set_result(self, result: Any) -> None: - return self._future.set_result(result=result) + def set_result(self, result: Any) -> None: + raise RuntimeError("set_result is not supported on SplitFuture; mutate the underlying Future instead") - def set_exception(self, exception: BaseException) -> None: - return self._future.set_exception(exception=exception) + def set_exception(self, exception: BaseException) -> None: + raise RuntimeError("set_exception is not supported on SplitFuture; mutate the underlying Future instead")
11-21: Cancellation semantics are “all-or-nothing.”Calling cancel() on any SplitFuture cancels the underlying future (thus all splits). This is acceptable, but please document this behavior in the docstring to prevent surprises.
42-43: Add return type and validate n.Minor polish: type the return value and guard negative n.
-from concurrent.futures import Future -from typing import Any, Optional +from concurrent.futures import Future +from typing import Any, Optional, List @@ -def split(future: Future, n: int): - return [SplitFuture(future=future, selector=i) for i in range(n)] +def split(future: Future, n: int) -> List[SplitFuture]: + if n < 0: + raise ValueError("n must be >= 0") + return [SplitFuture(future=future, selector=i) for i in range(n)]tests/test_standalone_split.py (2)
26-31: Strengthen exception test to cover all splits.Ensure exceptions propagate uniformly to every split.
def test_integration_exception(self): with SingleNodeExecutor() as exe: future = exe.submit(function_with_exception, 15) f1, f2, f3 = split(future=future, n=3) - with self.assertRaises(RuntimeError): - f3.result() + for f in (f1, f2, f3): + with self.assertRaises(RuntimeError): + f.result()
14-24: Add a callback semantics test (callback receives the SplitFuture).Validates the wrapper behavior proposed for add_done_callback/remove_done_callback.
def test_callback_receives_wrapper(self): with SingleNodeExecutor() as exe: future = exe.submit(function_with_multiple_outputs, 1) f1, f2, f3 = split(future=future, n=3) seen = {} def cb(f): seen["is_wrapper"] = (f is f2) f2.add_done_callback(cb) _ = f2.result() self.assertTrue(seen.get("is_wrapper", False))
📜 Review details
Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro
📒 Files selected for processing (2)
executorlib/standalone/split.py(1 hunks)tests/test_standalone_split.py(1 hunks)
🧰 Additional context used
🧬 Code graph analysis (1)
tests/test_standalone_split.py (2)
executorlib/executor/single.py (1)
SingleNodeExecutor(20-190)executorlib/standalone/split.py (3)
split(42-43)result(26-27)done(20-21)
🪛 Ruff (0.12.2)
tests/test_standalone_split.py
10-10: Unused function argument: i
(ARG001)
⏰ Context from checks skipped due to timeout of 90000ms. You can increase the timeout in your CodeRabbit configuration to a maximum of 15 minutes (900000ms). (13)
- GitHub Check: unittest_openmpi (macos-latest, 3.13)
- GitHub Check: unittest_openmpi (ubuntu-22.04-arm, 3.13)
- GitHub Check: unittest_mpich (ubuntu-22.04-arm, 3.13)
- GitHub Check: unittest_mpich (macos-latest, 3.13)
- GitHub Check: minimal
- GitHub Check: unittest_mpich (ubuntu-latest, 3.12)
- GitHub Check: unittest_mpich (ubuntu-latest, 3.13)
- GitHub Check: unittest_flux_mpich
- GitHub Check: benchmark (ubuntu-latest, 3.13, .ci_support/environment-openmpi.yml)
- GitHub Check: unittest_win
- GitHub Check: unittest_slurm_mpich
- GitHub Check: notebooks
- GitHub Check: benchmark (ubuntu-latest, 3.13, .ci_support/environment-mpich.yml)
🔇 Additional comments (1)
tests/test_standalone_split.py (1)
16-24: LGTM for base integration.Happy path behavior and done() checks look good.
executorlib/standalone/split.py
Outdated
| class SplitFuture(Future): | ||
| def __init__(self, future: Future, selector: int): | ||
| super().__init__() | ||
| self._future = future | ||
| self._selector = selector | ||
|
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
🛠️ Refactor suggestion
Align wrapper semantics with Future contract (track callbacks per wrapper).
Subclassing Future while delegating to an underlying Future is fine, but the current design leaks the underlying Future into user callbacks and makes remove_done_callback ineffective on the wrapper. Track per-wrapper callbacks and invoke user callbacks with the SplitFuture instance.
class SplitFuture(Future):
def __init__(self, future: Future, selector: int):
super().__init__()
self._future = future
self._selector = selector
+ # Map original callback -> wrapped callback for proper removal
+ self._cb_map: dict = {}📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
| class SplitFuture(Future): | |
| def __init__(self, future: Future, selector: int): | |
| super().__init__() | |
| self._future = future | |
| self._selector = selector | |
| class SplitFuture(Future): | |
| def __init__(self, future: Future, selector: int): | |
| super().__init__() | |
| self._future = future | |
| self._selector = selector | |
| # Map original callback -> wrapped callback for proper removal | |
| self._cb_map: dict = {} |
🤖 Prompt for AI Agents
In executorlib/standalone/split.py around lines 5-10, the SplitFuture currently
delegates to an underlying Future but forwards user callbacks directly to that
underlying Future, which leaks the underlying Future into callbacks and makes
remove_done_callback ineffective; to fix, have SplitFuture track per-wrapper
callbacks: implement add_done_callback to wrap the user callback in an internal
callback that calls the user callback with self (the SplitFuture) and register
that internal callback on the underlying Future while storing a mapping from the
user callback to the internal wrapper; implement remove_done_callback to look up
and remove the internal wrapper from the underlying Future and delete the
mapping; ensure done()/result()/exception()/cancel()/cancelled() still delegate
to the underlying Future but user-facing callbacks always receive the
SplitFuture instance.
executorlib/standalone/split.py
Outdated
| def add_done_callback(self, fn) -> None: | ||
| return self._future.add_done_callback(fn=fn) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
🛠️ Refactor suggestion
Ensure callbacks receive the SplitFuture and support proper removal.
Wrap callbacks so fn(self) is called, not fn(underlying_future). Also add remove_done_callback that removes the wrapped function from the underlying future.
- def add_done_callback(self, fn) -> None:
- return self._future.add_done_callback(fn=fn)
+ def add_done_callback(self, fn) -> None:
+ def _wrapped(_):
+ fn(self)
+ self._cb_map[fn] = _wrapped
+ self._future.add_done_callback(_wrapped)
+
+ def remove_done_callback(self, fn) -> int:
+ wrapped = self._cb_map.pop(fn, None)
+ if wrapped is None:
+ return 0
+ return self._future.remove_done_callback(wrapped)Committable suggestion skipped: line range outside the PR's diff.
🤖 Prompt for AI Agents
In executorlib/standalone/split.py around lines 23-24, the add_done_callback
currently forwards the underlying Future's callback so callers receive the
underlying future instead of the SplitFuture and there's no way to remove the
wrapped callback; change add_done_callback to wrap the provided fn in a small
adapter that calls fn(self) when the underlying future completes, register that
adapter via self._future.add_done_callback, and store a mapping (e.g.,
self._callback_map[fn] = adapter or allow multiple by using a list) so you can
later remove it; implement remove_done_callback(fn) to look up the adapter(s),
call self._future.remove_done_callback(adapter) for each stored adapter, clean
them from the map, and return the number of removed callbacks (matching
underlying Future.remove_done_callback behavior).
Codecov Report✅ All modified and coverable lines are covered by tests. Additional details and impacted files@@ Coverage Diff @@
## main #819 +/- ##
==========================================
+ Coverage 98.12% 98.16% +0.03%
==========================================
Files 33 34 +1
Lines 1650 1685 +35
==========================================
+ Hits 1619 1654 +35
Misses 31 31 ☔ View full report in Codecov by Sentry. 🚀 New features to boost your workflow:
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 1
♻️ Duplicate comments (1)
executorlib/standalone/split.py (1)
23-24: Fix callback semantics to pass the SplitFuture instance and support proper removal (prevents breakage with as_completed()).Currently, callbacks receive the underlying Future and remove_done_callback on the wrapper is ineffective. Wrap callbacks so they get
selfand track/remove them on the underlying future.Apply these diffs:
@@ -from typing import Any, Optional +from typing import Any, Optional, Callable@@ class SplitFuture(Future): def __init__(self, future: Future, selector: int): super().__init__() self._future = future self._selector = selector + # Map original callback -> list of wrapped callbacks registered on underlying + self._cb_map: dict[Callable, list[Callable]] = {} @@ - def add_done_callback(self, fn) -> None: - return self._future.add_done_callback(fn=fn) + def add_done_callback(self, fn) -> None: + def _wrapped(_): + fn(self) + self._cb_map.setdefault(fn, []).append(_wrapped) + self._future.add_done_callback(_wrapped) + + def remove_done_callback(self, fn) -> int: + wrappers = self._cb_map.pop(fn, []) + removed = 0 + for w in wrappers: + removed += self._future.remove_done_callback(w) + return removedAlso applies to: 6-9, 1-2
🧹 Nitpick comments (5)
executorlib/standalone/split.py (3)
26-27: Harden result() with clearer errors for bad selector or non-indexable results.- def result(self, timeout: Optional[float] = None) -> Any: - return self._future.result(timeout=timeout)[self._selector] + def result(self, timeout: Optional[float] = None) -> Any: + res = self._future.result(timeout=timeout) + try: + return res[self._selector] + except Exception as e: # IndexError/TypeError most likely + raise type(e)( + f"SplitFuture selector {self._selector} failed on result of type {type(res).__name__}: {e}" + ) from e
42-43: Validate input and add return type for split().-def split(future: Future, n: int): - return [SplitFuture(future=future, selector=i) for i in range(n)] +def split(future: Future, n: int) -> list[SplitFuture]: + if n < 0: + raise ValueError("n must be >= 0") + return [SplitFuture(future=future, selector=i) for i in range(n)]
35-39: Confirm intention to expose mutating methods on the wrapper.Delegating set_running_or_notify_cancel/set_result/set_exception gives callers the ability to mutate the underlying Future via the wrapper. If the API should be read-only, override these to raise RuntimeError instead.
Also applies to: 32-34
tests/test_standalone_split.py (2)
12-14: Silence lint: unused argument.-def function_with_exception(i): +def function_with_exception(_): raise RuntimeError()
33-40: Also assert all split futures propagate the exception.future = exe.submit(function_with_exception, 15) f1, f2, f3 = split(future=future, n=3) - with self.assertRaises(RuntimeError): - f3.result() + for f in (f1, f2, f3): + with self.assertRaises(RuntimeError): + f.result()
📜 Review details
Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro
📒 Files selected for processing (3)
executorlib/api.py(2 hunks)executorlib/standalone/split.py(1 hunks)tests/test_standalone_split.py(1 hunks)
🧰 Additional context used
🧬 Code graph analysis (2)
executorlib/api.py (1)
executorlib/standalone/split.py (1)
split(42-43)
tests/test_standalone_split.py (3)
executorlib/executor/single.py (1)
SingleNodeExecutor(20-190)executorlib/standalone/serialize.py (1)
cloudpickle_register(9-28)executorlib/standalone/split.py (12)
split(42-43)SplitFuture(5-39)result(26-27)done(20-21)add_done_callback(23-24)set_running_or_notify_cancel(32-33)running(17-18)set_result(35-36)cancel(11-12)cancelled(14-15)set_exception(38-39)exception(29-30)
🪛 Ruff (0.12.2)
tests/test_standalone_split.py
12-12: Unused function argument: i
(ARG001)
⏰ Context from checks skipped due to timeout of 90000ms. You can increase the timeout in your CodeRabbit configuration to a maximum of 15 minutes (900000ms). (15)
- GitHub Check: unittest_mpich (macos-latest, 3.13)
- GitHub Check: unittest_flux_mpich
- GitHub Check: unittest_openmpi (ubuntu-latest, 3.11)
- GitHub Check: unittest_mpich (ubuntu-latest, 3.12)
- GitHub Check: unittest_win
- GitHub Check: unittest_old
- GitHub Check: unittest_openmpi (macos-latest, 3.13)
- GitHub Check: unittest_openmpi (ubuntu-latest, 3.12)
- GitHub Check: unittest_mpich (ubuntu-latest, 3.11)
- GitHub Check: notebooks
- GitHub Check: benchmark (ubuntu-latest, 3.13, .ci_support/environment-openmpi.yml)
- GitHub Check: unittest_slurm_mpich
- GitHub Check: benchmark (ubuntu-latest, 3.13, .ci_support/environment-mpich.yml)
- GitHub Check: minimal
- GitHub Check: notebooks_integration
🔇 Additional comments (1)
executorlib/api.py (1)
21-21: Public export looks good.Also applies to: 33-33
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 1
♻️ Duplicate comments (1)
tests/test_standalone_split.py (1)
20-22: Stop printing in tests; assert the callback was invoked and clarify target.Current callback prints and doesn’t assert anything. Also, by design the callback receives the underlying Future, not the SplitFuture. Make this explicit and test it.
-def callback(future): - print("callback:", future.result()) +# (removed: use local callback inside the test)def test_split_future_object(self): f1 = Future() fs1 = SplitFuture(future=f1, selector=1) - fs1.add_done_callback(callback) + seen = [] + def cb(f): + # SplitFuture.add_done_callback delegates to the underlying future. + seen.append(f is f1) + fs1.add_done_callback(cb) fs1.set_running_or_notify_cancel() self.assertTrue(fs1.running()) fs1.set_result([1, 2]) self.assertEqual(fs1.result(), 2) + self.assertTrue(all(seen))If you prefer the callback to receive the SplitFuture instead, adjust the implementation accordingly (see separate suggestion below).
Also applies to: 45-52
🧹 Nitpick comments (4)
tests/test_standalone_split.py (4)
16-17: Rename unused parameter to underscore to satisfy Ruff (ARG001).The argument isn’t used; keep the call shape but mark it intentionally unused.
-def function_with_exception(i): +def function_with_exception(_): raise RuntimeError()
45-63: Optional: Assert cancellation/exception semantics more thoroughly.
- Verify cancellation is visible across all wrappers of the same underlying Future.
- Prefer isinstance over type equality for exceptions.
f2 = Future() fs2 = SplitFuture(future=f2, selector=1) fs2.cancel() self.assertTrue(fs2.cancelled()) + fs2_other = SplitFuture(future=f2, selector=0) + self.assertTrue(fs2_other.cancelled()) f3 = Future() fs3 = SplitFuture(future=f3, selector=1) fs3.set_exception(RuntimeError()) - self.assertEqual(type(fs3.exception()), RuntimeError) + self.assertIsInstance(fs3.exception(), RuntimeError) with self.assertRaises(RuntimeError): fs3.result()
37-44: Also assert exception propagation to all split futures.All splits should surface the underlying exception.
with self.assertRaises(RuntimeError): f3.result() + with self.assertRaises(RuntimeError): + f1.result() + with self.assertRaises(RuntimeError): + f2.result()
12-14: Exercise string-key selection (dict results) to cover SplitFuture(selector: str).This function is currently unused. Add a focused unit test for key-based selection.
class TestSplitFuture(unittest.TestCase): @@ def test_split_future_object(self): ... with self.assertRaises(RuntimeError): fs3.result() + + def test_split_future_object_dict_selector(self): + f = Future() + fs = SplitFuture(future=f, selector="c") + fs.set_result({"a": 1, "b": 2, "c": 42}) + self.assertEqual(fs.result(), 42)
📜 Review details
Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro
📒 Files selected for processing (2)
executorlib/standalone/split.py(1 hunks)tests/test_standalone_split.py(1 hunks)
🚧 Files skipped from review as they are similar to previous changes (1)
- executorlib/standalone/split.py
🧰 Additional context used
🧬 Code graph analysis (1)
tests/test_standalone_split.py (3)
executorlib/executor/single.py (1)
SingleNodeExecutor(20-190)executorlib/standalone/serialize.py (1)
cloudpickle_register(9-28)executorlib/standalone/split.py (12)
split(42-43)SplitFuture(5-39)result(26-27)done(20-21)add_done_callback(23-24)set_running_or_notify_cancel(32-33)running(17-18)set_result(35-36)cancel(11-12)cancelled(14-15)set_exception(38-39)exception(29-30)
🪛 Ruff (0.12.2)
tests/test_standalone_split.py
16-16: Unused function argument: i
(ARG001)
🔇 Additional comments (1)
tests/test_standalone_split.py (1)
27-27: Confirm cloudpickle_register(ind=1) is correct for this call site.Default is 2; if 1 is intentional for test modules, keep it—otherwise consider reverting to the default.
| def test_split_future_object(self): | ||
| f1 = Future() | ||
| fs1 = SplitFuture(future=f1, selector=1) | ||
| fs1.add_done_callback(callback) | ||
| fs1.set_running_or_notify_cancel() | ||
| self.assertTrue(fs1.running()) | ||
| fs1.set_result([1, 2]) | ||
| self.assertEqual(fs1.result(), 2) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
🛠️ Refactor suggestion
If desired: Change callback semantics to return the SplitFuture and support removal.
Right now, SplitFuture.add_done_callback delegates to the underlying future, so remove_done_callback on the wrapper won’t detach that callback. If you want canonical wrapper semantics (callback gets the wrapper; removal works), update SplitFuture as below.
Apply in executorlib/standalone/split.py:
class SplitFuture(Future):
- def __init__(self, future: Future, selector: int | str):
+ def __init__(self, future: Future, selector: int | str):
super().__init__()
self._future = future
self._selector = selector
+ self._cb_map: dict = {}
- def add_done_callback(self, fn) -> None:
- return self._future.add_done_callback(fn=fn)
+ def add_done_callback(self, fn) -> None:
+ def wrapped(_):
+ fn(self)
+ self._cb_map[fn] = wrapped
+ return self._future.add_done_callback(fn=wrapped)
+
+ def remove_done_callback(self, fn) -> int:
+ wrapped = self._cb_map.pop(fn, None)
+ if wrapped is None:
+ return 0
+ return self._future.remove_done_callback(wrapped)I can send a follow-up PR adjusting tests to assert these semantics if you choose this route.
📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
| def test_split_future_object(self): | |
| f1 = Future() | |
| fs1 = SplitFuture(future=f1, selector=1) | |
| fs1.add_done_callback(callback) | |
| fs1.set_running_or_notify_cancel() | |
| self.assertTrue(fs1.running()) | |
| fs1.set_result([1, 2]) | |
| self.assertEqual(fs1.result(), 2) | |
| class SplitFuture(Future): | |
| def __init__(self, future: Future, selector: int | str): | |
| super().__init__() | |
| self._future = future | |
| self._selector = selector | |
| self._cb_map: dict = {} | |
| def add_done_callback(self, fn) -> None: | |
| # Wrap the user-provided fn so it always receives this SplitFuture | |
| def wrapped(_): | |
| fn(self) | |
| # Keep track of the original -> wrapped mapping for removal | |
| self._cb_map[fn] = wrapped | |
| return self._future.add_done_callback(fn=wrapped) | |
| def remove_done_callback(self, fn) -> int: | |
| # Look up and remove the wrapped function | |
| wrapped = self._cb_map.pop(fn, None) | |
| if wrapped is None: | |
| return 0 | |
| return self._future.remove_done_callback(wrapped) |
🤖 Prompt for AI Agents
In tests/test_standalone_split.py around lines 45-52, the test exposes that
SplitFuture currently delegates add_done_callback to the underlying Future so
remove_done_callback on the wrapper cannot detach the callback and callbacks
receive the inner Future instead of the wrapper; modify
executorlib/standalone/split.py so add_done_callback wraps the user callback in
a small adapter that calls the original with the SplitFuture wrapper (not the
inner future), store a mapping from original callback -> adapter so
remove_done_callback can look up and pass the adapter to the underlying future's
remove_done_callback, and ensure edge cases (callback already removed or not
present) are handled gracefully.
for more information, see https://pre-commit.ci
for more information, see https://pre-commit.ci
ToDo List:
Summary by CodeRabbit
New Features
Tests