Skip to content

Commit

Permalink
thread implementation
Browse files Browse the repository at this point in the history
Signed-off-by: mitsudome-r <ryohsuke.mitsudome@tier4.jp>
  • Loading branch information
mitsudome-r committed Mar 16, 2021
1 parent eb5e03d commit d3414b2
Show file tree
Hide file tree
Showing 4 changed files with 36 additions and 30 deletions.
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
import fnmatch
from threading import Lock
import time

from rclpy.duration import Duration
from rosbridge_library.internal.ros_loader import get_service_class
from rosbridge_library.internal import message_conversion
from rosbridge_library.capability import Capability
Expand All @@ -28,7 +28,7 @@ def next_id(self):
self.id_counter += 1
return id

def handle_request(self, req):
def handle_request(self, req, res):
with self.lock:
self.active_requests += 1
# generate a unique ID
Expand Down Expand Up @@ -74,8 +74,8 @@ def graceful_shutdown(self, timeout):
"""
with self.lock:
self.shutdown_requested = True
start_time = time.clock()
while time.clock() - start_time < timeout:
start_time = self.protocol.node_handle.get_clock().now()
while self.protocol.node_handle.get_clock().now() - start_time < Duration(nanoseconds = timeout * 1e9):
time.sleep(0)
self.protocol.node_handle.destroy_service(self.service_handle)

Expand Down Expand Up @@ -115,7 +115,7 @@ def advertise_service(self, message):
# check for an existing entry
if service_name in self.protocol.external_service_list.keys():
self.protocol.log("warn", "Duplicate service advertised. Overwriting %s." % service_name)
self.protocol.external_service_list[service_name].service_handle.shutdown("Duplicate advertiser.")
self.protocol.external_service_list[service_name].graceful_shutdown(0.1)
del self.protocol.external_service_list[service_name]

# setup and store the service information
Expand Down
9 changes: 2 additions & 7 deletions rosbridge_library/src/rosbridge_library/internal/services.py
Original file line number Diff line number Diff line change
Expand Up @@ -119,12 +119,7 @@ def call_service(node_handle, service, args=None):

client = node_handle.create_client(service_class, service)

future = client.call_async(inst)
spin_until_future_complete(node_handle, future)
if future.result() is not None:
# Turn the response into JSON and pass to the callback
json_response = extract_values(future.result())
else:
raise Exception(future.exception())
response = client.call(inst)
json_response = extract_values(response)

return json_response
43 changes: 27 additions & 16 deletions rosbridge_server/scripts/rosbridge_websocket.py
Original file line number Diff line number Diff line change
Expand Up @@ -35,10 +35,11 @@

import sys
import time
import signal

from socket import error

from threading import Thread
from threading import Thread, Lock
from tornado.ioloop import IOLoop
from tornado.ioloop import PeriodicCallback
from tornado.web import Application
Expand All @@ -58,19 +59,12 @@
from rosbridge_library.capabilities.unadvertise_service import UnadvertiseService
from rosbridge_library.capabilities.call_service import CallService

def start_hook():
IOLoop.instance().start()

def shutdown_hook():
IOLoop.instance().stop()


class RosbridgeWebsocketNode(Node):
def __init__(self):
def __init__(self, io_loop_instance):
super().__init__('rosbridge_websocket')

RosbridgeWebSocket.node_handle = self

RosbridgeWebSocket.io_loop_instance = io_loop_instance
##################################################
# Parameter handling #
##################################################
Expand Down Expand Up @@ -292,20 +286,37 @@ def __init__(self):
"Retrying in {}s.".format(e, retry_startup_delay))
time.sleep(retry_startup_delay)

class ROSThreadManager:
def __init__(self, args, node):
self.is_node_running = True
self.node = node
self.ros_thread = Thread(target=self.ros_spin)
self.ros_thread.start()
def ros_spin(self):
rclpy.logging.get_logger("thread_manager").info("ros thread started")
while self.is_node_running:
rclpy.spin_once(self.node, timeout_sec=0.01)
rclpy.logging.get_logger("thread_manager").info("ros thread terminatted")
def stop_thread(self):
self.is_node_running = False
self.ros_thread.join()
def main(args=None):
if args is None:
args = sys.argv

rclpy.init(args=args)
rosbridge_websocket_node = RosbridgeWebsocketNode()
# add SIGINT termination
signal.signal(signal.SIGINT, lambda sig, frame: io_loop.add_callback_from_signal(io_loop.stop))

spin_callback = PeriodicCallback(lambda: rclpy.spin_once(rosbridge_websocket_node, timeout_sec=0.01), 1)
spin_callback.start()
start_hook()
rclpy.init(args=args)
io_loop = IOLoop.instance()
node = RosbridgeWebsocketNode(io_loop)
manager = ROSThreadManager(args, node)
io_loop.start()

# after SIGINT recieved
manager.stop_thread()
node.destroy_node()
rclpy.shutdown()
shutdown_hook() # shutdown hook to stop the server

if __name__ == '__main__':
main()
4 changes: 2 additions & 2 deletions rosbridge_server/src/rosbridge_server/websocket_handler.py
Original file line number Diff line number Diff line change
Expand Up @@ -86,7 +86,7 @@ class RosbridgeWebSocket(WebSocketHandler):
unregister_timeout = 10.0 # seconds
bson_only_mode = False
node_handle = None

io_loop_instance = None

@log_exceptions
def open(self):
Expand Down Expand Up @@ -189,7 +189,7 @@ def send_message(self, message):
binary = False

with self._write_lock:
IOLoop.instance().add_callback(partial(self.prewrite_message, message, binary))
self.io_loop_instance.add_callback(partial(self.prewrite_message, message, binary))

@coroutine
def prewrite_message(self, message, binary):
Expand Down

0 comments on commit d3414b2

Please sign in to comment.