Skip to content

Commit

Permalink
Fix instability in the ros2 daemon. (#947)
Browse files Browse the repository at this point in the history
The original problem here is that running the test_strategy.py test
in the nightly repeated jobs "sometimes" fails.

There have been a few attempts to fix flakiness in the ros2 daemon in the past.
These include #620 , 
#622 , and
#652 .
These all changed things in various ways, but the key PR was #652, which made
spawning the daemon a reliable operation.

#622 made some changes to change the sockets to add SO_LINGER with a zero timeout.
That improved, but did not totally solve the situation. It also has its own downsides, as
SO_LINGER doesn't gracefully terminate connections and instead just sends RST on the
socket and terminates it.

To fix this for real requires 3 parts in this commit, though one of the parts is platform-dependent:

1.  When the daemon is exiting cleanly, it should explicitly shutdown the socket that it was using
for the XMLRPC server. That will cleanly shutdown the socket, and tell the kernel it can start the
cleanup. On its own, this does not completely solve the problem, but it reduces the amount of time
that things are hanging about waiting for the Python interpreter and/or the kernel to implicitly
clean things up.
2.  We should not specify SO_LINGER on the daemon sockets. As mentioned above, this is actually
something of an anti-pattern and does not properly terminate connections with FIN (it just sends RST).
3.  We should specify SO_REUSEADDR, but only on Unix. On Unix, SO_REUSEADDR essentially means
"allow binding to an address/port that is in TCP TIME_WAIT (but not that is otherwise in use)".
This is exactly the behavior we want. On Windows, SO_REUSEADDR causes undefined behavior, as
it can cause a socket to bind even if there is something else bound already. Because of that, we want
to set SO_REUSEADDR on Unix, but not Windows.

Finally, while testing here I had to add in one bugfix to make things reliable on Windows, which is to
also catch ConnectionResetError. That arises because we can attempt to "connect" to a daemon that
is in the process of shutting down. In that case, we should also consider the daemon not "connected".

Signed-off-by: Chris Lalancette <clalancette@gmail.com>
  • Loading branch information
clalancette authored Nov 20, 2024
1 parent 8766c33 commit e998af3
Show file tree
Hide file tree
Showing 3 changed files with 16 additions and 17 deletions.
7 changes: 7 additions & 0 deletions ros2cli/ros2cli/daemon/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -151,6 +151,13 @@ def shutdown_handler():
pass


def serve_and_close(server: LocalXMLRPCServer, *, timeout: int = 2 * 60 * 60):
try:
serve(server, timeout=timeout)
finally:
server.server_close()


def main(*, argv=None):
parser = argparse.ArgumentParser(
formatter_class=argparse.ArgumentDefaultsHelpFormatter)
Expand Down
4 changes: 2 additions & 2 deletions ros2cli/ros2cli/node/daemon.py
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ def connected(self):
for method in self._proxy.system.listMethods()
if not method.startswith('system.')
]
except ConnectionRefusedError:
except (ConnectionRefusedError, ConnectionResetError):
return False
return True

Expand Down Expand Up @@ -168,7 +168,7 @@ def spawn_daemon(args, timeout=None, debug=False):
'rmw_implementation': rclpy.get_rmw_implementation_identifier()}

daemonize(
functools.partial(daemon.serve, server),
functools.partial(daemon.serve_and_close, server),
tags=tags, timeout=timeout, debug=debug)
finally:
server.server_close()
Expand Down
22 changes: 7 additions & 15 deletions ros2cli/ros2cli/xmlrpc/local_server.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,8 +12,8 @@
# See the License for the specific language governing permissions and
# limitations under the License.

import os
import socket
import struct
# Import SimpleXMLRPCRequestHandler to re-export it.
from xmlrpc.server import SimpleXMLRPCRequestHandler # noqa
from xmlrpc.server import SimpleXMLRPCServer
Expand All @@ -32,20 +32,12 @@ def get_local_ipaddrs():

class LocalXMLRPCServer(SimpleXMLRPCServer):

allow_reuse_address = False

def server_bind(self):
# Prevent listening socket from lingering in TIME_WAIT state after close()
self.socket.setsockopt(
socket.SOL_SOCKET, socket.SO_LINGER, struct.pack('ii', 1, 0))
super(LocalXMLRPCServer, self).server_bind()

def get_request(self):
# Prevent accepted socket from lingering in TIME_WAIT state after close()
sock, addr = super(LocalXMLRPCServer, self).get_request()
sock.setsockopt(
socket.SOL_SOCKET, socket.SO_LINGER, struct.pack('ii', 1, 0))
return sock, addr
# Allow re-binding even if another server instance was recently bound (i.e. we are still in
# TCP TIME_WAIT). This is already the default behavior on Windows, and further SO_REUSEADDR can
# lead to undefined behavior on Windows; see
# https://learn.microsoft.com/en-us/windows/win32/winsock/using-so-reuseaddr-and-so-exclusiveaddruse. # noqa
# So we don't set the option for Windows.
allow_reuse_address = False if os.name == 'nt' else True

def verify_request(self, request, client_address):
if client_address[0] not in get_local_ipaddrs():
Expand Down

0 comments on commit e998af3

Please sign in to comment.