|
| 1 | +# Copyright 2022 Mycroft AI Inc. |
| 2 | +# |
| 3 | +# Licensed under the Apache License, Version 2.0 (the "License"); |
| 4 | +# you may not use this file except in compliance with the License. |
| 5 | +# You may obtain a copy of the License at |
| 6 | +# |
| 7 | +# http://www.apache.org/licenses/LICENSE-2.0 |
| 8 | +# |
| 9 | +# Unless required by applicable law or agreed to in writing, software |
| 10 | +# distributed under the License is distributed on an "AS IS" BASIS, |
| 11 | +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| 12 | +# See the License for the specific language governing permissions and |
| 13 | +# limitations under the License. |
| 14 | +# |
| 15 | +"""GUI message bus implementation |
| 16 | +
|
| 17 | +The basic mechanism is: |
| 18 | + 1) GUI client connects to the core messagebus |
| 19 | + 2) Core prepares a port for a socket connection to this GUI |
| 20 | + 3) The availability of the port is sent over the Core |
| 21 | + 4) The GUI connects to the GUI message bus websocket |
| 22 | + 5) Connection persists for graphical interaction indefinitely |
| 23 | +
|
| 24 | +If the connection is lost, it must be renegotiated and restarted. |
| 25 | +""" |
| 26 | +import asyncio |
| 27 | +import json |
| 28 | +from threading import Lock |
| 29 | + |
| 30 | +from tornado import ioloop |
| 31 | +from tornado.options import parse_command_line |
| 32 | +from tornado.web import Application |
| 33 | +from tornado.websocket import WebSocketHandler |
| 34 | + |
| 35 | +from mycroft.configuration import Configuration |
| 36 | +from mycroft.messagebus import Message |
| 37 | +from mycroft.util.log import LOG |
| 38 | +from mycroft.util.process_utils import create_daemon |
| 39 | + |
| 40 | +write_lock = Lock() |
| 41 | + |
| 42 | + |
| 43 | +def get_gui_websocket_config(): |
| 44 | + """Retrieves the configuration values for establishing a GUI message bus""" |
| 45 | + config = Configuration.get() |
| 46 | + websocket_config = config["gui_websocket"] |
| 47 | + |
| 48 | + return websocket_config |
| 49 | + |
| 50 | + |
| 51 | +def create_gui_service(enclosure) -> Application: |
| 52 | + """Initiate a websocket for communicating with the GUI service.""" |
| 53 | + LOG.info('Starting message bus for GUI...') |
| 54 | + websocket_config = get_gui_websocket_config() |
| 55 | + # Disable all tornado logging so mycroft loglevel isn't overridden |
| 56 | + parse_command_line(['--logging=None']) |
| 57 | + |
| 58 | + routes = [(websocket_config['route'], GUIWebsocketHandler)] |
| 59 | + application = Application(routes, debug=True) |
| 60 | + application.enclosure = enclosure |
| 61 | + application.listen( |
| 62 | + websocket_config['base_port'], websocket_config['host'] |
| 63 | + ) |
| 64 | + |
| 65 | + create_daemon(ioloop.IOLoop.instance().start) |
| 66 | + LOG.info('GUI Message bus started!') |
| 67 | + return application |
| 68 | + |
| 69 | + |
| 70 | +def send_message_to_gui(message): |
| 71 | + """Sends the supplied message to all connected GUI clients.""" |
| 72 | + for connection in GUIWebsocketHandler.clients: |
| 73 | + try: |
| 74 | + connection.send(message) |
| 75 | + except Exception as e: |
| 76 | + LOG.exception(repr(e)) |
| 77 | + |
| 78 | + |
| 79 | +def determine_if_gui_connected(): |
| 80 | + """Returns True if any clients are connected to the GUI bus.""" |
| 81 | + return len(GUIWebsocketHandler.clients) > 0 |
| 82 | + |
| 83 | + |
| 84 | +class GUIWebsocketHandler(WebSocketHandler): |
| 85 | + """Defines the websocket pipeline between the GUI and Mycroft.""" |
| 86 | + clients = [] |
| 87 | + |
| 88 | + def open(self): |
| 89 | + GUIWebsocketHandler.clients.append(self) |
| 90 | + LOG.info('New Connection opened!') |
| 91 | + self.synchronize() |
| 92 | + |
| 93 | + def on_close(self): |
| 94 | + LOG.info('Closing {}'.format(id(self))) |
| 95 | + GUIWebsocketHandler.clients.remove(self) |
| 96 | + |
| 97 | + def synchronize(self): |
| 98 | + """ Upload namespaces, pages and data to the last connected. """ |
| 99 | + namespace_pos = 0 |
| 100 | + enclosure = self.application.enclosure |
| 101 | + |
| 102 | + for namespace in enclosure.active_namespaces: |
| 103 | + LOG.info(f'Sync {namespace.name}') |
| 104 | + # Insert namespace |
| 105 | + self.send({"type": "mycroft.session.list.insert", |
| 106 | + "namespace": "mycroft.system.active_skills", |
| 107 | + "position": namespace_pos, |
| 108 | + "data": [{"skill_id": namespace.name}] |
| 109 | + }) |
| 110 | + # Insert pages |
| 111 | + self.send({"type": "mycroft.gui.list.insert", |
| 112 | + "namespace": namespace.name, |
| 113 | + "position": 0, |
| 114 | + "data": [{"url": p.url} for p in namespace.pages] |
| 115 | + }) |
| 116 | + # Insert data |
| 117 | + for key, value in namespace.data.items(): |
| 118 | + self.send({"type": "mycroft.session.set", |
| 119 | + "namespace": namespace.name, |
| 120 | + "data": {key: value} |
| 121 | + }) |
| 122 | + namespace_pos += 1 |
| 123 | + |
| 124 | + def on_message(self, message): |
| 125 | + LOG.info("Received: {message}") |
| 126 | + msg = json.loads(message) |
| 127 | + if (msg.get('type') == "mycroft.events.triggered" and |
| 128 | + (msg.get('event_name') == 'page_gained_focus' or |
| 129 | + msg.get('event_name') == 'system.gui.user.interaction')): |
| 130 | + # System event, a page was changed |
| 131 | + event_name = msg.get('event_name') |
| 132 | + if event_name == 'page_gained_focus': |
| 133 | + msg_type = 'gui.page_gained_focus' |
| 134 | + else: |
| 135 | + msg_type = 'gui.page_interaction' |
| 136 | + |
| 137 | + msg_data = {'namespace': msg['namespace'], |
| 138 | + 'page_number': msg['parameters'].get('number'), |
| 139 | + 'skill_id': msg['parameters'].get('skillId')} |
| 140 | + elif msg.get('type') == "mycroft.events.triggered": |
| 141 | + # A normal event was triggered |
| 142 | + msg_type = '{}.{}'.format(msg['namespace'], msg['event_name']) |
| 143 | + msg_data = msg['parameters'] |
| 144 | + |
| 145 | + elif msg.get('type') == 'mycroft.session.set': |
| 146 | + # A value was changed send it back to the skill |
| 147 | + msg_type = '{}.{}'.format(msg['namespace'], 'set') |
| 148 | + msg_data = msg['data'] |
| 149 | + |
| 150 | + message = Message(msg_type, msg_data) |
| 151 | + LOG.info('Forwarding to bus...') |
| 152 | + self.application.enclosure.core_bus.emit(message) |
| 153 | + LOG.info('Done!') |
| 154 | + |
| 155 | + def write_message(self, *arg, **kwarg): |
| 156 | + """Wraps WebSocketHandler.write_message() with a lock. """ |
| 157 | + try: |
| 158 | + asyncio.get_event_loop() |
| 159 | + except RuntimeError: |
| 160 | + asyncio.set_event_loop(asyncio.new_event_loop()) |
| 161 | + |
| 162 | + with write_lock: |
| 163 | + super().write_message(*arg, **kwarg) |
| 164 | + |
| 165 | + def send(self, data): |
| 166 | + """Send the given data across the socket as JSON |
| 167 | +
|
| 168 | + Args: |
| 169 | + data (dict): Data to transmit |
| 170 | + """ |
| 171 | + s = json.dumps(data) |
| 172 | + #LOG.info('Sending {}'.format(s)) |
| 173 | + self.write_message(s) |
| 174 | + |
| 175 | + def check_origin(self, origin): |
| 176 | + """Disable origin check to make js connections work.""" |
| 177 | + return True |
0 commit comments