diff --git a/rosbridge_library/src/rosbridge_library/capabilities/subscribe.py b/rosbridge_library/src/rosbridge_library/capabilities/subscribe.py index 6d5cc8c92..eefdb889a 100644 --- a/rosbridge_library/src/rosbridge_library/capabilities/subscribe.py +++ b/rosbridge_library/src/rosbridge_library/capabilities/subscribe.py @@ -87,7 +87,7 @@ def unregister(self): """ Unsubscribes this subscription and cleans up resources """ manager.unsubscribe(self.client_id, self.topic) with self.handler_lock: - self.handler.finish() + self.handler.finish(block=False) self.clients.clear() def subscribe(self, sid=None, msg_type=None, throttle_rate=0, diff --git a/rosbridge_library/src/rosbridge_library/internal/subscription_modifiers.py b/rosbridge_library/src/rosbridge_library/internal/subscription_modifiers.py index 00172c864..77052ceb5 100644 --- a/rosbridge_library/src/rosbridge_library/internal/subscription_modifiers.py +++ b/rosbridge_library/src/rosbridge_library/internal/subscription_modifiers.py @@ -80,7 +80,7 @@ def transition(self): else: return QueueMessageHandler(self) - def finish(self): + def finish(self, block=True): pass @@ -98,7 +98,7 @@ def transition(self): else: return QueueMessageHandler(self) - def finish(self): + def finish(self, block=True): pass @@ -115,6 +115,9 @@ def __init__(self, previous_handler): def handle_message(self, msg): with self.c: + if not self.alive: + return + should_notify = len(self.queue) == 0 self.queue.append(msg) if should_notify: @@ -136,14 +139,15 @@ def transition(self): self.c.notify() return self - def finish(self): + def finish(self, block=True): """ If throttle was set to 0, this pushes all buffered messages """ # Notify the thread to finish with self.c: self.alive = False self.c.notify() - self.join() + if block: + self.join() def run(self): while self.alive: diff --git a/rosbridge_server/src/rosbridge_server/autobahn_websocket.py b/rosbridge_server/src/rosbridge_server/autobahn_websocket.py index d31242ab5..9af0f29ae 100755 --- a/rosbridge_server/src/rosbridge_server/autobahn_websocket.py +++ b/rosbridge_server/src/rosbridge_server/autobahn_websocket.py @@ -39,6 +39,7 @@ import threading import traceback from functools import wraps +from collections import deque from autobahn.twisted.websocket import WebSocketServerProtocol from twisted.internet import interfaces, reactor @@ -66,6 +67,48 @@ def wrapper(*args, **kwargs): return wrapper +class IncomingQueue(threading.Thread): + """Decouples incoming messages from the Autobahn thread. + + This mitigates cases where outgoing messages are blocked by incoming, + and vice versa. + """ + def __init__(self, protocol): + threading.Thread.__init__(self) + self.daemon = True + self.queue = deque() + self.protocol = protocol + + self.cond = threading.Condition() + self._finished = False + + def finish(self): + """Clear the queue and do not accept further messages.""" + with self.cond: + self._finished = True + while len(self.queue) > 0: + self.queue.popleft() + self.cond.notify() + + def push(self, msg): + with self.cond: + self.queue.append(msg) + self.cond.notify() + + def run(self): + while True: + with self.cond: + if len(self.queue) == 0 and not self._finished: + self.cond.wait() + + if self._finished: + break + + msg = self.queue.popleft() + + self.protocol.incoming(msg) + + @implementer(interfaces.IPushProducer) class OutgoingValve: """Allows the Autobahn transport to pause outgoing messages from rosbridge. @@ -131,6 +174,8 @@ def onOpen(self): } try: self.protocol = RosbridgeProtocol(cls.client_id_seed, parameters=parameters) + self.incoming_queue = IncomingQueue(self.protocol) + self.incoming_queue.start() producer = OutgoingValve(self) self.transport.registerProducer(producer, True) producer.resumeProducing() @@ -177,10 +222,10 @@ def onMessage(self, message, binary): self.sendClose() except: # proper error will be handled in the protocol class - self.protocol.incoming(message) + self.incoming_queue.push(message) else: # no authentication required - self.protocol.incoming(message) + self.incoming_queue.push(message) def outgoing(self, message): if type(message) == bson.BSON: @@ -201,6 +246,7 @@ def onClose(self, was_clean, code, reason): cls = self.__class__ cls.clients_connected -= 1 self.protocol.finish() + self.incoming_queue.finish() if cls.client_manager: cls.client_manager.remove_client(self.client_id, self.peer) rospy.loginfo("Client disconnected. %d clients total.", cls.clients_connected)