diff --git a/rosbridge_library/src/rosbridge_library/capabilities/advertise_service.py b/rosbridge_library/src/rosbridge_library/capabilities/advertise_service.py index c61ad8172..3a39369ef 100644 --- a/rosbridge_library/src/rosbridge_library/capabilities/advertise_service.py +++ b/rosbridge_library/src/rosbridge_library/capabilities/advertise_service.py @@ -1,7 +1,7 @@ import fnmatch from threading import Lock import time - +from rclpy.duration import Duration from rosbridge_library.internal.ros_loader import get_service_class from rosbridge_library.internal import message_conversion from rosbridge_library.capability import Capability @@ -28,7 +28,7 @@ def next_id(self): self.id_counter += 1 return id - def handle_request(self, req): + def handle_request(self, req, res): with self.lock: self.active_requests += 1 # generate a unique ID @@ -74,8 +74,8 @@ def graceful_shutdown(self, timeout): """ with self.lock: self.shutdown_requested = True - start_time = time.clock() - while time.clock() - start_time < timeout: + start_time = self.protocol.node_handle.get_clock().now() + while self.protocol.node_handle.get_clock().now() - start_time < Duration(nanoseconds = timeout * 1e9): time.sleep(0) self.protocol.node_handle.destroy_service(self.service_handle) @@ -115,7 +115,7 @@ def advertise_service(self, message): # check for an existing entry if service_name in self.protocol.external_service_list.keys(): self.protocol.log("warn", "Duplicate service advertised. Overwriting %s." % service_name) - self.protocol.external_service_list[service_name].service_handle.shutdown("Duplicate advertiser.") + self.protocol.external_service_list[service_name].graceful_shutdown(0.1) del self.protocol.external_service_list[service_name] # setup and store the service information diff --git a/rosbridge_library/src/rosbridge_library/internal/services.py b/rosbridge_library/src/rosbridge_library/internal/services.py index 119f6bb6b..011a101b1 100644 --- a/rosbridge_library/src/rosbridge_library/internal/services.py +++ b/rosbridge_library/src/rosbridge_library/internal/services.py @@ -119,12 +119,7 @@ def call_service(node_handle, service, args=None): client = node_handle.create_client(service_class, service) - future = client.call_async(inst) - spin_until_future_complete(node_handle, future) - if future.result() is not None: - # Turn the response into JSON and pass to the callback - json_response = extract_values(future.result()) - else: - raise Exception(future.exception()) + response = client.call(inst) + json_response = extract_values(response) return json_response diff --git a/rosbridge_server/scripts/rosbridge_websocket.py b/rosbridge_server/scripts/rosbridge_websocket.py index 5fb16b7b0..a88947fc8 100755 --- a/rosbridge_server/scripts/rosbridge_websocket.py +++ b/rosbridge_server/scripts/rosbridge_websocket.py @@ -35,10 +35,11 @@ import sys import time +import signal from socket import error -from threading import Thread +from threading import Thread, Lock from tornado.ioloop import IOLoop from tornado.ioloop import PeriodicCallback from tornado.web import Application @@ -58,19 +59,12 @@ from rosbridge_library.capabilities.unadvertise_service import UnadvertiseService from rosbridge_library.capabilities.call_service import CallService -def start_hook(): - IOLoop.instance().start() - -def shutdown_hook(): - IOLoop.instance().stop() - - class RosbridgeWebsocketNode(Node): - def __init__(self): + def __init__(self, io_loop_instance): super().__init__('rosbridge_websocket') RosbridgeWebSocket.node_handle = self - + RosbridgeWebSocket.io_loop_instance = io_loop_instance ################################################## # Parameter handling # ################################################## @@ -292,20 +286,37 @@ def __init__(self): "Retrying in {}s.".format(e, retry_startup_delay)) time.sleep(retry_startup_delay) +class ROSThreadManager: + def __init__(self, args, node): + self.is_node_running = True + self.node = node + self.ros_thread = Thread(target=self.ros_spin) + self.ros_thread.start() + def ros_spin(self): + rclpy.logging.get_logger("thread_manager").info("ros thread started") + while self.is_node_running: + rclpy.spin_once(self.node, timeout_sec=0.01) + rclpy.logging.get_logger("thread_manager").info("ros thread terminatted") + def stop_thread(self): + self.is_node_running = False + self.ros_thread.join() def main(args=None): if args is None: args = sys.argv - rclpy.init(args=args) - rosbridge_websocket_node = RosbridgeWebsocketNode() + # add SIGINT termination + signal.signal(signal.SIGINT, lambda sig, frame: io_loop.add_callback_from_signal(io_loop.stop)) - spin_callback = PeriodicCallback(lambda: rclpy.spin_once(rosbridge_websocket_node, timeout_sec=0.01), 1) - spin_callback.start() - start_hook() + rclpy.init(args=args) + io_loop = IOLoop.instance() + node = RosbridgeWebsocketNode(io_loop) + manager = ROSThreadManager(args, node) + io_loop.start() + # after SIGINT recieved + manager.stop_thread() node.destroy_node() rclpy.shutdown() - shutdown_hook() # shutdown hook to stop the server if __name__ == '__main__': main() diff --git a/rosbridge_server/src/rosbridge_server/websocket_handler.py b/rosbridge_server/src/rosbridge_server/websocket_handler.py index 9916e8c2e..070473206 100755 --- a/rosbridge_server/src/rosbridge_server/websocket_handler.py +++ b/rosbridge_server/src/rosbridge_server/websocket_handler.py @@ -86,7 +86,7 @@ class RosbridgeWebSocket(WebSocketHandler): unregister_timeout = 10.0 # seconds bson_only_mode = False node_handle = None - + io_loop_instance = None @log_exceptions def open(self): @@ -189,7 +189,7 @@ def send_message(self, message): binary = False with self._write_lock: - IOLoop.instance().add_callback(partial(self.prewrite_message, message, binary)) + self.io_loop_instance.add_callback(partial(self.prewrite_message, message, binary)) @coroutine def prewrite_message(self, message, binary):