-
Notifications
You must be signed in to change notification settings - Fork 3
Communication: Use zmq.Poller() rather than waiting infinitely #789
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
WalkthroughReplaces blocking recv with a zmq.Poller-based non-blocking receive in SocketInterface, adds ExecutorlibSocketError, adds a time_out_ms constructor parameter, defers logger initialization unless requested, and checks spawner liveness during receive before raising on termination. Changes
Sequence Diagram(s)sequenceDiagram
autonumber
participant Caller as Caller
participant SI as SocketInterface
participant P as zmq.Poller
participant S as ZMQ PAIR Socket
participant SP as Spawner
Note over SI: __init__ registers S with P, stores time_out_ms, logger deferred
Caller->>SI: receive_dict()
loop poll loop
SI->>P: poll(time_out_ms)
alt POLLIN ready
P-->>SI: ready
SI->>S: recv(NOBLOCK)
S-->>SI: raw bytes
SI->>Caller: deserialize -> dict / raise payload error
else timeout
SI->>SP: poll()
alt Spawner alive
SP-->>SI: alive
Note over SI: continue polling
else Spawner terminated
SI-->>Caller: raise ExecutorlibSocketError
end
end
end
Estimated code review effort🎯 3 (Moderate) | ⏱️ ~25 minutes Poem
✨ Finishing Touches
🧪 Generate unit tests
Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out. 🪧 TipsChatThere are 3 ways to chat with CodeRabbit:
SupportNeed help? Create a ticket on our support page for assistance with any issues or questions. CodeRabbit Commands (Invoked using PR/Issue comments)Type Other keywords and placeholders
CodeRabbit Configuration File (
|
Codecov Report✅ All modified and coverable lines are covered by tests. Additional details and impacted files@@ Coverage Diff @@
## main #789 +/- ##
==========================================
+ Coverage 97.74% 97.75% +0.01%
==========================================
Files 33 33
Lines 1461 1471 +10
==========================================
+ Hits 1428 1438 +10
Misses 33 33 ☔ View full report in Codecov by Sentry. 🚀 New features to boost your workflow:
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 2
🧹 Nitpick comments (3)
executorlib/standalone/interactive/communication.py (3)
37-42: Prevent shutdown hangs: set LINGER=0 and unregister before closeWithout LINGER=0, closing sockets can block; also unregister closed sockets from the poller.
self._socket = self._context.socket(zmq.PAIR) + self._socket.setsockopt(zmq.LINGER, 0) self._poller = zmq.Poller() self._poller.register(self._socket, zmq.POLLIN) @@ - if self._socket is not None: - self._socket.close() + if self._socket is not None: + try: + self._poller.unregister(self._socket) + except Exception: + pass + self._socket.close()
24-26: Surface timeout to callers via interface_bootupExpose time_out_ms in interface_bootup to avoid hardcoding and enable tuning per workload.
-def interface_bootup( +def interface_bootup( command_lst: list[str], connections, hostname_localhost: Optional[bool] = None, log_obj_size: bool = False, worker_id: Optional[int] = None, + time_out_ms: int = 1000, ) -> SocketInterface: @@ - log_obj_size (boolean): Enable debug mode which reports the size of the communicated objects. + log_obj_size (bool): Enable debug mode which reports the size of the communicated objects. + time_out_ms (int): Socket poll timeout in milliseconds. @@ - interface = SocketInterface( - spawner=connections, - log_obj_size=log_obj_size, - ) + interface = SocketInterface( + spawner=connections, + log_obj_size=log_obj_size, + time_out_ms=time_out_ms, + )
32-33: Docstring type names: use bool instead of “boolean”Keep types consistent with Python typing in docstrings.
- log_obj_size (boolean): Enable debug mode which reports the size of the communicated objects. + log_obj_size (bool): Enable debug mode which reports the size of the communicated objects. - time_out_ms (int): Time out for waiting for a message on socket in milliseconds. + time_out_ms (int): Timeout for waiting for a message on the socket in milliseconds.Additionally (same file, other docstrings):
- log_obj_size (boolean): Enable debug mode which reports the size of the communicated objects. + log_obj_size (bool): Enable debug mode which reports the size of the communicated objects.
📜 Review details
Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro
💡 Knowledge Base configuration:
- MCP integration is disabled by default for public repositories
- Jira integration is disabled by default for public repositories
- Linear integration is disabled by default for public repositories
You can enable these sources in your CodeRabbit configuration.
📒 Files selected for processing (1)
executorlib/standalone/interactive/communication.py(2 hunks)
⏰ Context from checks skipped due to timeout of 90000ms. You can increase the timeout in your CodeRabbit configuration to a maximum of 15 minutes (900000ms). (7)
- GitHub Check: notebooks_integration
- GitHub Check: unittest_win
- GitHub Check: benchmark (ubuntu-latest, 3.13, .ci_support/environment-openmpi.yml)
- GitHub Check: benchmark (ubuntu-latest, 3.13, .ci_support/environment-mpich.yml)
- GitHub Check: unittest_mpich (macos-latest, 3.13)
- GitHub Check: unittest_slurm_mpich
- GitHub Check: notebooks
| response_lst: list[tuple[Any, int]] = [] | ||
| while len(response_lst) == 0: | ||
| response_lst = self._poller.poll(self._time_out_ms) | ||
| if not self._spawner.poll(): | ||
| raise ExecutorlibSockerError() | ||
| data = self._socket.recv(zmq.NOBLOCK) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
💡 Verification agent
🧩 Analysis chain
Fix poll/recv race and avoid false-negative termination
Current code may raise on a legit event if the child exits right after sending. Only check spawner status when no events; drop NOBLOCK after a POLLIN to avoid EAGAIN; guard spawner None; add a message.
- response_lst: list[tuple[Any, int]] = []
- while len(response_lst) == 0:
- response_lst = self._poller.poll(self._time_out_ms)
- if not self._spawner.poll():
- raise ExecutorlibSockerError()
- data = self._socket.recv(zmq.NOBLOCK)
+ while True:
+ events: list[tuple[zmq.Socket, int]] = self._poller.poll(self._time_out_ms)
+ if events:
+ try:
+ data = self._socket.recv() # ready to read, no DONTWAIT needed
+ break
+ except zmq.Again:
+ # rare spurious wakeup; keep waiting
+ continue
+ if self._spawner is not None and not self._spawner.poll():
+ raise ExecutorlibSocketError("Spawner terminated before a message was received.")To double-check BaseSpawner semantics (that poll() returns False when dead):
🏁 Script executed:
#!/bin/bash
# Inspect spawner implementations to validate the poll() contract.
rg -nP 'class\s+BaseSpawner\b|class\s+\w*Spawner\b|def\s+poll\s*\(' -C3 --type=pyLength of output: 4280
🏁 Script executed:
#!/bin/bash
# Inspect SubprocessSpawner.poll implementation
sed -n '130,150p' executorlib/standalone/interactive/spawner.py
# Inspect communication class initializer for _spawner assignment
sed -n '1,80p' executorlib/standalone/interactive/communication.py
# Search for exception definitions
rg -n "ExecutorlibSockerError" executorlib
rg -n "ExecutorlibSocketError" executorlibLength of output: 3883
Fix poll/recv race and avoid false-negative termination
Only check the spawner status when no events; remove NOBLOCK after POLLIN to avoid spurious EAGAIN; guard against a None _spawner; use the existing ExecutorlibSockerError class.
- response_lst: list[tuple[Any, int]] = []
- while len(response_lst) == 0:
- response_lst = self._poller.poll(self._time_out_ms)
- if not self._spawner.poll():
- raise ExecutorlibSockerError()
- data = self._socket.recv(zmq.NOBLOCK)
+ while True:
+ events = self._poller.poll(self._time_out_ms)
+ if events:
+ try:
+ data = self._socket.recv() # data available, no DONTWAIT needed
+ break
+ except zmq.Again:
+ continue
+ if self._spawner is not None and not self._spawner.poll():
+ raise ExecutorlibSockerError("Spawner terminated before a message was received.")📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
| response_lst: list[tuple[Any, int]] = [] | |
| while len(response_lst) == 0: | |
| response_lst = self._poller.poll(self._time_out_ms) | |
| if not self._spawner.poll(): | |
| raise ExecutorlibSockerError() | |
| data = self._socket.recv(zmq.NOBLOCK) | |
| while True: | |
| events = self._poller.poll(self._time_out_ms) | |
| if events: | |
| try: | |
| data = self._socket.recv() # data available, no DONTWAIT needed | |
| break | |
| except zmq.Again: | |
| continue | |
| if self._spawner is not None and not self._spawner.poll(): | |
| raise ExecutorlibSockerError("Spawner terminated before a message was received.") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 0
Caution
Some comments are outside the diff and can’t be posted inline due to platform limitations.
⚠️ Outside diff range comments (1)
executorlib/standalone/interactive/communication.py (1)
128-133: Guard shutdown when no spawner is attachedshutdown() and del will raise AttributeError if spawner is None. Add a None-check.
- if self._spawner.poll(): + if self._spawner is not None and self._spawner.poll(): result = self.send_and_receive_dict( input_dict={"shutdown": True, "wait": wait} ) self._spawner.shutdown(wait=wait)
♻️ Duplicate comments (1)
executorlib/standalone/interactive/communication.py (1)
66-72: Fix poll/recv race; remove NOBLOCK and guard spawner NoneCalling recv(NOBLOCK) right after POLLIN can still raise Again; also raising on spawner death even when events exist is a false negative. Only check spawner when no events; drop NOBLOCK; add message; guard None.
- response_lst: list[tuple[Any, int]] = [] - while len(response_lst) == 0: - response_lst = self._poller.poll(self._time_out_ms) - if not self._spawner.poll(): - raise ExecutorlibSocketError() - data = self._socket.recv(zmq.NOBLOCK) + while True: + events: list[tuple[zmq.Socket, int]] = self._poller.poll(self._time_out_ms) + if events: + try: + data = self._socket.recv() # ready to read + break + except zmq.Again: + continue # rare spurious wakeup + if self._spawner is not None and not self._spawner.poll(): + raise ExecutorlibSocketError("Spawner terminated before a message was received.")
🧹 Nitpick comments (5)
executorlib/standalone/interactive/communication.py (5)
10-11: Document the public exception for clarityAdd a brief class docstring; raising site already updated in the recv loop diff to include a message.
class ExecutorlibSocketError(RuntimeError): - pass + """Raised when the spawner terminates early or socket communication fails.""" + pass
4-4: Drop unused Any importAfter the recv-loop change, Any is no longer needed.
-from typing import Any, Optional +from typing import Optional
21-21: Tighten docstrings: clarify units and defaultMake the timeout description explicit and consistent.
- time_out_ms (int): Time out for waiting for a message on socket in milliseconds. + time_out_ms (int): Timeout in milliseconds for waiting on the socket. Default: 1000.- time_out_ms (int): Time out for waiting for a message on socket in milliseconds. + time_out_ms (int): Timeout in milliseconds for waiting on the socket. Default: 1000.Also applies to: 32-33
150-156: Plumb time_out_ms through interface_bootupExpose and pass the new timeout so callers of interface_bootup can configure it.
def interface_bootup( command_lst: list[str], connections, hostname_localhost: Optional[bool] = None, log_obj_size: bool = False, - worker_id: Optional[int] = None, + worker_id: Optional[int] = None, + time_out_ms: int = 1000, ) -> SocketInterface:- log_obj_size (boolean): Enable debug mode which reports the size of the communicated objects. + log_obj_size (boolean): Enable debug mode which reports the size of the communicated objects. + time_out_ms (int): Timeout in milliseconds for waiting on the socket. Default: 1000.- interface = SocketInterface( - spawner=connections, - log_obj_size=log_obj_size, - ) + interface = SocketInterface( + spawner=connections, + log_obj_size=log_obj_size, + time_out_ms=time_out_ms, + )Also applies to: 171-171, 187-190
175-177: Fix return-type docstring pathModule path is inaccurate; update to this module.
- executorlib.shared.communication.SocketInterface: socket interface for zmq communication + executorlib.standalone.interactive.communication.SocketInterface: socket interface for zmq communication
📜 Review details
Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro
💡 Knowledge Base configuration:
- MCP integration is disabled by default for public repositories
- Jira integration is disabled by default for public repositories
- Linear integration is disabled by default for public repositories
You can enable these sources in your CodeRabbit configuration.
📒 Files selected for processing (1)
executorlib/standalone/interactive/communication.py(2 hunks)
🧰 Additional context used
🧬 Code graph analysis (1)
executorlib/standalone/interactive/communication.py (1)
executorlib/standalone/interactive/spawner.py (2)
poll(52-59)poll(131-138)
🔇 Additional comments (2)
executorlib/standalone/interactive/communication.py (2)
37-38: Poller registration looks correctRegistering the PAIR socket for POLLIN on a dedicated Poller is the right approach.
24-26: Constructor change LGTMOptional timeout parameter with sane default; logger deferral is good.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 1
🧹 Nitpick comments (1)
tests/test_standalone_interactive_communication.py (1)
118-146: Guard against CI hangs and ensure deterministic failure.
- Add a small
time_out_msto the interface for faster failure.- Wait for the terminated process to actually exit to avoid race conditions.
- Always clean up sockets/contexts via
addCleanup, even on failure.Apply:
def test_interface_serial_with_stopped_process(self): cloudpickle_register(ind=1) task_dict = {"fn": calc, "args": (), "kwargs": {"i": 2}} interface = SocketInterface( spawner=MpiExecSpawner(cwd=None, cores=1, openmpi_oversubscribe=False), log_obj_size=True, + time_out_ms=250, ) + self.addCleanup(lambda: interface.shutdown(wait=False)) interface.bootup( command_lst=[ sys.executable, os.path.abspath( os.path.join( __file__, "..", "..", "executorlib", "backend", "interactive_serial.py", ) ), "--zmqport", str(interface.bind_to_random_port()), ] ) interface.send_dict(input_dict=task_dict) interface._spawner._process.terminate() + interface._spawner._process.wait(timeout=5) with self.assertRaises(ExecutorlibSocketError): interface.receive_dict()
📜 Review details
Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro
💡 Knowledge Base configuration:
- MCP integration is disabled by default for public repositories
- Jira integration is disabled by default for public repositories
- Linear integration is disabled by default for public repositories
You can enable these sources in your CodeRabbit configuration.
📒 Files selected for processing (1)
tests/test_standalone_interactive_communication.py(2 hunks)
🧰 Additional context used
🧬 Code graph analysis (1)
tests/test_standalone_interactive_communication.py (2)
executorlib/standalone/interactive/communication.py (3)
ExecutorlibSocketError(10-11)send_dict(46-57)receive_dict(59-80)executorlib/standalone/serialize.py (1)
cloudpickle_register(9-28)
⏰ Context from checks skipped due to timeout of 90000ms. You can increase the timeout in your CodeRabbit configuration to a maximum of 15 minutes (900000ms). (17)
- GitHub Check: mypy
- GitHub Check: unittest_mpich (ubuntu-latest, 3.13)
- GitHub Check: unittest_openmpi (ubuntu-24.04-arm, 3.13)
- GitHub Check: unittest_openmpi (macos-latest, 3.13)
- GitHub Check: unittest_openmpi (ubuntu-latest, 3.11)
- GitHub Check: unittest_mpich (macos-latest, 3.13)
- GitHub Check: unittest_openmpi (ubuntu-22.04-arm, 3.13)
- GitHub Check: unittest_openmpi (ubuntu-latest, 3.13)
- GitHub Check: unittest_openmpi (ubuntu-latest, 3.12)
- GitHub Check: unittest_flux_mpich
- GitHub Check: unittest_slurm_mpich
- GitHub Check: notebooks
- GitHub Check: unittest_win
- GitHub Check: benchmark (ubuntu-latest, 3.13, .ci_support/environment-openmpi.yml)
- GitHub Check: unittest_flux_openmpi
- GitHub Check: notebooks_integration
- GitHub Check: benchmark (ubuntu-latest, 3.13, .ci_support/environment-mpich.yml)
🔇 Additional comments (1)
tests/test_standalone_interactive_communication.py (1)
15-16: Importing the public exception is correct.Good to assert on the explicit, public
ExecutorlibSocketError.
| def test_interface_serial_with_stopped_process(self): | ||
| cloudpickle_register(ind=1) | ||
| task_dict = {"fn": calc, "args": (), "kwargs": {"i": 2}} | ||
| interface = SocketInterface( | ||
| spawner=MpiExecSpawner(cwd=None, cores=1, openmpi_oversubscribe=False), | ||
| log_obj_size=True, | ||
| ) | ||
| interface.bootup( | ||
| command_lst=[ | ||
| sys.executable, | ||
| os.path.abspath( | ||
| os.path.join( | ||
| __file__, | ||
| "..", | ||
| "..", | ||
| "executorlib", | ||
| "backend", | ||
| "interactive_serial.py", | ||
| ) | ||
| ), | ||
| "--zmqport", | ||
| str(interface.bind_to_random_port()), | ||
| ] | ||
| ) | ||
| interface.send_dict(input_dict=task_dict) | ||
| interface._spawner._process.terminate() | ||
| with self.assertRaises(ExecutorlibSocketError): | ||
| interface.receive_dict() | ||
|
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
💡 Verification agent
🧩 Analysis chain
Likely inverted liveness check in receive_dict; test may mask the bug.
receive_dict() currently raises when the spawner is alive and may loop forever when it exited with a non-zero code (common after terminate()). This can cause this test to hang instead of raising, or to raise for the wrong reason.
- Proposed fix (outside this file) in
executorlib/standalone/interactive/communication.py:
def receive_dict(self) -> dict:
"""
Receive a dictionary from a connected client process.
"""
- response_lst: list[tuple[Any, int]] = []
- while len(response_lst) == 0:
- response_lst = self._poller.poll(self._time_out_ms)
- if not self._spawner.poll():
- raise ExecutorlibSocketError()
+ response_lst: list[tuple[Any, int]] = []
+ while True:
+ # If the spawner exited (any return code), fail fast.
+ if self._spawner.poll() is not None:
+ raise ExecutorlibSocketError("Spawner terminated while waiting for response")
+ response_lst = self._poller.poll(self._time_out_ms)
+ if response_lst:
+ break
data = self._socket.recv(zmq.NOBLOCK)- Verify the current logic and ensure the fix is applied:
🏁 Script executed:
#!/bin/bash
set -euo pipefail
# Show the receive_dict implementation and nearby lines
rg -n -C3 $'def\\s+receive_dict\\b' executorlib/standalone/interactive/communication.py
# Detect the inverted check; this should return no matches after the fix
rg -n $'if\\s+not\\s+self\\._spawner\\.poll\\(\\)\\s*:' executorlib/standalone/interactive/communication.py || trueLength of output: 506
Fix inverted liveness check in receive_dict
executorlib/standalone/interactive/communication.py:69 still uses
if not self._spawner.poll():
raise ExecutorlibSocketError()This is inverted – it hangs when the spawner exits. Change to:
if self._spawner.poll() is not None:
raise ExecutorlibSocketError("Spawner terminated while waiting for response")to fail fast on termination.
🤖 Prompt for AI Agents
In executorlib/standalone/interactive/communication.py around line 69, the
liveness check is inverted: currently it raises when poll() is falsy and thus
hangs when the spawner exits; change the condition to check for termination
using if self._spawner.poll() is not None and raise ExecutorlibSocketError with
a clear message like "Spawner terminated while waiting for response" so the code
fails fast when the spawner has exited.
Summary by CodeRabbit
New Features
Bug Fixes
Refactor
Tests