diff --git a/qubesusbproxy/core3ext.py b/qubesusbproxy/core3ext.py index 34f56cc..799bff6 100644 --- a/qubesusbproxy/core3ext.py +++ b/qubesusbproxy/core3ext.py @@ -5,6 +5,8 @@ # # Copyright (C) 2016 Marek Marczykowski-Górecki # +# Copyright (C) 2024 Piotr Bartman-Szwarc +# # # This program is free software; you can redistribute it and/or modify # it under the terms of the GNU General Public License as published by @@ -19,8 +21,7 @@ # You should have received a copy of the GNU General Public License along # with this program; if not, write to the Free Software Foundation, Inc., # 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA. -# -import asyncio + import collections import fcntl import grp @@ -28,9 +29,59 @@ import re import string import subprocess +import sys -import errno import tempfile +from typing import List, Optional, Dict, Tuple + +try: + from qubes.device_protocol import DeviceInfo + from qubes.device_protocol import DeviceInterface + from qubes.ext import utils + from qubes.devices import UnrecognizedDevice + + def get_assigned_devices(devices): + yield from devices.get_assigned_devices() +except ImportError: + # This extension supports both the legacy and new device API. + # In the case of the legacy backend, functionality is limited. + from qubes.devices import DeviceInfo as LegacyDeviceInfo + from qubesusbproxy import utils + + class DescriptionOverrider: + @property + def description(self): + return self.name + + class DeviceInfo(DescriptionOverrider, LegacyDeviceInfo): + def __init__(self, *args, **kwargs): + # not supported options in legacy code + del kwargs['devclass'] + self.safe_chars = self.safe_chars.replace(' ', '') + super().__init__(*args, **kwargs) + + # needed but not in legacy DeviceInfo + self._vendor = None + self._product = None + self._manufacturer = None + self._name = None + self._serial = None + # `_load_interfaces_from_qubesdb` will never be called + self._interfaces = "?******" + + @property + def frontend_domain(self): + return self.attachment + + class DeviceInterface: + pass + + class UnrecognizedDevice(ValueError): + pass + + def get_assigned_devices(devices): + yield from devices.assignments(persistent=True) + import qubes.devices import qubes.ext @@ -41,36 +92,230 @@ usb_connected_to_re = re.compile(br"^[a-zA-Z][a-zA-Z0-9_.-]*$") usb_device_hw_ident_re = re.compile(r'^[0-9a-f]{4}:[0-9a-f]{4} ') -class USBDevice(qubes.devices.DeviceInfo): +HWDATA_PATH = '/usr/share/hwdata' + + +class USBDevice(DeviceInfo): # pylint: disable=too-few-public-methods def __init__(self, backend_domain, ident): - super(USBDevice, self).__init__(backend_domain, ident, None) + # the superclass can restrict the allowed characters + self.safe_chars = (string.ascii_letters + string.digits + + string.punctuation + ' ') + super(USBDevice, self).__init__( + backend_domain=backend_domain, ident=ident, devclass="usb") self._qdb_ident = ident.replace('.', '_') self._qdb_path = '/qubes-usb-devices/' + self._qdb_ident - # lazy loading - self._description = None + self._vendor_id = None + self._product_id = None + + @property + def vendor(self) -> str: + """ + Device vendor from local database `/usr/share/hwdata/usb.ids` + + Could be empty string or "unknown". + + Lazy loaded. + """ + if self._vendor is None: + result = self._load_desc_from_qubesdb()["vendor"] + else: + result = self._vendor + return result + + @property + def product(self) -> str: + """ + Device name from local database `/usr/share/hwdata/usb.ids` + + Could be empty string or "unknown". + + Lazy loaded. + """ + if self._product is None: + result = self._load_desc_from_qubesdb()["product"] + else: + result = self._product + return result + + @property + def manufacturer(self) -> str: + """ + The name of the manufacturer of the device introduced by device itself + + Could be empty string or "unknown". + + Lazy loaded. + """ + if self._manufacturer is None: + result = self._load_desc_from_qubesdb()["manufacturer"] + else: + result = self._manufacturer + return result + + @property + def name(self) -> str: + """ + The name of the device it introduced itself with (could be empty string) + + Could be empty string or "unknown". + + Lazy loaded. + """ + if self._name is None: + result = self._load_desc_from_qubesdb()["name"] + else: + result = self._name + return result + + @property + def serial(self) -> str: + """ + The serial number of the device it introduced itself with. + + Could be empty string or "unknown". + + Lazy loaded. + """ + if self._serial is None: + result = self._load_desc_from_qubesdb()["serial"] + else: + result = self._serial + return result + @property + def interfaces(self) -> List[DeviceInterface]: + """ + List of device interfaces. + + Every device should have at least one interface. + """ + if self._interfaces is None: + result = self._load_interfaces_from_qubesdb() + else: + result = self._interfaces + return result @property - def description(self): - if self._description is None: - if not self.backend_domain.is_running(): - # don't cache this value - return "Unknown - domain not running" - untrusted_device_desc = self.backend_domain.untrusted_qdb.read( + def parent_device(self) -> Optional[DeviceInfo]: + """ + The parent device, if any. + + A USB device has no parents. + """ + return None + + def _load_interfaces_from_qubesdb(self) -> List[DeviceInterface]: + result = [DeviceInterface.unknown()] + if not self.backend_domain.is_running(): + # don't cache this value + return result + untrusted_interfaces: bytes = ( + self.backend_domain.untrusted_qdb.read( + self._qdb_path + '/interfaces') + ) + if not untrusted_interfaces: + return result + self._interfaces = result = [ + DeviceInterface( + self._sanitize(ifc, safe_chars=string.hexdigits), devclass="usb" + ) + for ifc in untrusted_interfaces.split(b':') + if ifc + ] + return result + + def _load_desc_from_qubesdb(self) -> Dict[str, str]: + unknown = "unknown" + result = {"vendor": unknown, + "vendor ID": "0000", + "product": unknown, + "product ID": "0000", + "manufacturer": unknown, + "name": unknown, + "serial": unknown} + if not self.backend_domain.is_running(): + # don't cache this value + return result + untrusted_device_desc: bytes = ( + self.backend_domain.untrusted_qdb.read( self._qdb_path + '/desc') - if not untrusted_device_desc: - return 'Unknown' - self._description = self._sanitize_desc(untrusted_device_desc) - hw_ident_match = usb_device_hw_ident_re.match(self._description) - if hw_ident_match: - self._description = self._description[ - len(hw_ident_match.group(0)):] - return self._description + ) + if not untrusted_device_desc: + return result + try: + (untrusted_vend_prod_id, untrusted_manufacturer, + untrusted_name, untrusted_serial + ) = untrusted_device_desc.split(b' ') + untrusted_vendor_id, untrusted_product_id = ( + untrusted_vend_prod_id.split(b':')) + except ValueError: + # desc doesn't contain correctly formatted data, + # but it is not empty. We cannot parse it, + # but we can still put it to the `name` just to provide + # some information to the user. + untrusted_vendor_id, untrusted_product_id = (b"0000", b"0000") + (untrusted_manufacturer, untrusted_serial) = ( + unknown.encode() for _ in range(2)) + untrusted_name = untrusted_device_desc.replace(b' ', b'_') + + # Data successfully loaded, cache these values + self._vendor_id = result["vendor ID"] = self._sanitize( + untrusted_vendor_id) + self._product_id = result["product ID"] = self._sanitize( + untrusted_product_id) + vendor, product = self._get_vendor_and_product_names( + self._vendor_id, self._product_id) + self._vendor = result["vendor"] = self._sanitize(vendor.encode()) + self._product = result["product"] = self._sanitize(product.encode()) + self._manufacturer = result["manufacturer"] = ( + self._sanitize(untrusted_manufacturer)) + self._name = result["name"] = self._sanitize(untrusted_name) + self._name = result["serial"] = self._sanitize(untrusted_serial) + return result + + def _sanitize( + self, untrusted_device_desc: bytes, + safe_chars: Optional[str] = None + ) -> str: + # rb'USB\x202.0\x20Camera' -> 'USB 2.0 Camera' + if safe_chars is None: + safe_chars = self.safe_chars + safe_chars_set = set(safe_chars) + + result = "" + i = 0 + while i < len(untrusted_device_desc): + c = chr(untrusted_device_desc[i]) + if c == '\\': + i += 1 + if i >= len(untrusted_device_desc): + break + c = chr(untrusted_device_desc[i]) + if c == 'x': + i += 2 + if i >= len(untrusted_device_desc): + break + hex_code = untrusted_device_desc[i - 1: i + 1] + try: + for j in range(2): + if hex_code[j] not in b'0123456789abcdefABCDEF': + raise ValueError() + hex_value = int(hex_code, 16) + c = chr(hex_value) + except ValueError: + c = '_' + + if c in safe_chars_set: + result += c + else: + result += '_' + i += 1 + return result @property - def frontend_domain(self): + def attachment(self): if not self.backend_domain.is_running(): return None untrusted_connected_to = self.backend_domain.untrusted_qdb.read( @@ -90,20 +335,85 @@ def frontend_domain(self): untrusted_connected_to] except KeyError: self.backend_domain.log.warning( - 'Device {} has invalid VM name in connected-to ' - 'property: '.format(self.ident, untrusted_connected_to)) + f'Device {self.ident} has invalid VM name in connected-to ' + f'property: {untrusted_connected_to}') return None return connected_to + @property + def self_identity(self) -> str: + """ + Get identification of a device not related to port. + """ + if self._vendor_id is None: + vendor_id = self._load_desc_from_qubesdb()["vendor ID"] + else: + vendor_id = self._vendor_id + if self._product_id is None: + product_id = self._load_desc_from_qubesdb()["product ID"] + else: + product_id = self._product_id + interfaces = ''.join(repr(ifc) for ifc in self.interfaces) + serial = self.serial if self.serial != "unknown" else "" + return \ + f'{vendor_id}:{product_id}:{serial}:{interfaces}' + @staticmethod - def _sanitize_desc(untrusted_device_desc): - untrusted_device_desc = untrusted_device_desc.decode('ascii', - errors='ignore') - safe_set = set(string.ascii_letters + string.digits + - string.punctuation + ' ') - return ''.join( - c if c in safe_set else '_' for c in untrusted_device_desc - ) + def _get_vendor_and_product_names( + vendor_id: str, product_id: str + ) -> Tuple[str, str]: + """ + Return tuple of vendor's and product's names for the ids. + + If the id is not known, return ("unknown", "unknown"). + """ + return (USBDevice._load_usb_known_devices() + .get(vendor_id, dict()) + .get(product_id, ("unknown", "unknown")) + ) + + @staticmethod + def _load_usb_known_devices() -> Dict[str, Dict[str, Tuple[str, str]]]: + """ + List of known device vendors, devices and interfaces. + + result[vendor_id][device_id] = (vendor_name, product_name) + """ + # Syntax: + # vendor vendor_name <-- 2 spaces between + # device device_name <-- single tab + # interface interface_name <-- two tabs + # ... + # C class class_name + # subclass subclass_name <-- single tab + # prog-if prog-if_name <-- two tabs + result = {} + with open(HWDATA_PATH + '/usb.ids', + encoding='utf-8', errors='ignore') as usb_ids: + for line in usb_ids.readlines(): + line = line.rstrip() + if line.startswith('#'): + # skip comments + continue + elif not line: + # skip empty lines + continue + elif line.startswith('\t\t'): + # skip interfaces + continue + elif line.startswith('C '): + # description of classes starts here, we can finish + break + elif line.startswith('\t'): + # save vendor, device pair + device_id, _, device_name = line[1:].split(' ', 2) + result[vendor_id][device_id] = vendor_name, device_name + else: + # new vendor + vendor_id, _, vendor_name = line[:].split(' ', 2) + result[vendor_id] = {} + + return result class USBProxyNotInstalled(qubes.exc.QubesException): @@ -170,7 +480,7 @@ class USBDeviceExtension(qubes.ext.Extension): def __init__(self): super(USBDeviceExtension, self).__init__() - #include dom0 devices in listing only when usb-proxy is really + # include dom0 devices in listing only when usb-proxy is really # installed there self.usb_proxy_installed_in_dom0 = os.path.exists( '/etc/qubes-rpc/qubes.USB') @@ -178,72 +488,37 @@ def __init__(self): @qubes.ext.handler('domain-init', 'domain-load') def on_domain_init_load(self, vm, event): - '''Initialize watching for changes''' + """Initialize watching for changes""" # pylint: disable=unused-argument,no-self-use vm.watch_qdb_path('/qubes-usb-devices') if event == 'domain-load': # avoid building a cache on domain-init, as it isn't fully set yet, # and definitely isn't running yet - current_devices = dict((dev.ident, dev.frontend_domain) - for dev in self.on_device_list_usb(vm, None)) + current_devices = { + dev.ident: dev.attachment + for dev in self.on_device_list_usb(vm, None) + } self.devices_cache[vm.name] = current_devices else: self.devices_cache[vm.name] = {} - async def _attach_and_notify(self, vm, device, options): + async def attach_and_notify(self, vm, device, options): # bypass DeviceCollection logic preventing double attach - await self.on_device_attach_usb(vm, - 'device-pre-attach:usb', device, options) - await vm.fire_event_async('device-attach:usb', - device=device, - options=options) + try: + await self.on_device_attach_usb( + vm, 'device-pre-attach:usb', device, options) + except UnrecognizedDevice: + return + await vm.fire_event_async( + 'device-attach:usb', device=device, options=options) @qubes.ext.handler('domain-qdb-change:/qubes-usb-devices') def on_qdb_change(self, vm, event, path): - '''A change in QubesDB means a change in device list''' + """A change in QubesDB means a change in a device list.""" # pylint: disable=unused-argument,no-self-use - vm.fire_event('device-list-change:usb') - current_devices = dict((dev.ident, dev.frontend_domain) - for dev in self.on_device_list_usb(vm, None)) - - # compare cached devices and current devices, collect: - # - newly appeared devices - # - devices disconnected from a vm - # - devices connected to a vm - new_devices = set() - connected_devices = dict() - disconnected_devices = dict() - devices_cache_for_vm = self.devices_cache[vm.name] - for dev, connected_to in current_devices.items(): - if dev not in devices_cache_for_vm: - new_devices.add(dev) - elif devices_cache_for_vm[dev] != current_devices[dev]: - if devices_cache_for_vm[dev] is not None: - disconnected_devices[dev] = devices_cache_for_vm[dev] - if current_devices[dev] is not None: - connected_devices[dev] = current_devices[dev] - - self.devices_cache[vm.name] = current_devices - # send events about devices detached/attached outside by themselves - # (like device pulled out or manual qubes.USB qrexec call) - for dev_ident, front_vm in disconnected_devices.items(): - dev = USBDevice(vm, dev_ident) - asyncio.ensure_future(front_vm.fire_event_async('device-detach:usb', - device=dev)) - for dev_ident, front_vm in connected_devices.items(): - dev = USBDevice(vm, dev_ident) - asyncio.ensure_future(front_vm.fire_event_async('device-attach:usb', - device=dev, - options={})) - for front_vm in vm.app.domains: - if not front_vm.is_running(): - continue - for assignment in front_vm.devices['usb'].assignments( - persistent=True): - if assignment.backend_domain == vm and \ - assignment.ident in new_devices: - asyncio.ensure_future(self._attach_and_notify( - front_vm, assignment.device, assignment.options)) + current_devices = dict((dev.ident, dev.attachment) + for dev in self.on_device_list_usb(vm, None)) + utils.device_list_change(self, current_devices, vm, path, USBDevice) @qubes.ext.handler('device-list:usb') def on_device_list_usb(self, vm, event): @@ -259,7 +534,7 @@ def on_device_list_usb(self, vm, event): untrusted_dev_list = vm.untrusted_qdb.list('/qubes-usb-devices/') if not untrusted_dev_list: return - # just get list of devices, not its every property + # just get a list of devices, not its every property untrusted_dev_list = \ set(path.split('/')[2] for path in untrusted_dev_list) for untrusted_qdb_ident in untrusted_dev_list: @@ -296,30 +571,47 @@ def on_device_list_attached(self, vm, event, **kwargs): return for dev in self.get_all_devices(vm.app): - if dev.frontend_domain == vm: - yield (dev, {}) + if dev.attachment == vm: + yield (dev, {'identity': dev.self_identity}) @qubes.ext.handler('device-pre-attach:usb') async def on_device_attach_usb(self, vm, event, device, options): # pylint: disable=unused-argument + + if options: + if list(options.keys()) != ['identity']: + raise qubes.exc.QubesException( + 'USB device attach do not support user options') + identity = options['identity'] + if identity != 'any' and device.self_identity != identity: + print(f"Unrecognized identity, skipping attachment of {device}", + file=sys.stderr) + raise UnrecognizedDevice( + "Device presented identity " + f"{device.self_identity} " + f"does not match expected {identity}" + ) + if not vm.is_running() or vm.qid == 0: + # print(f"Qube is not running, skipping attachment of {device}", + # file=sys.stderr) return if not isinstance(device, USBDevice): + # print("The device is not recognized as usb device, " + # f"skipping attachment of {device}", + # file=sys.stderr) return - if options: - raise qubes.exc.QubesException( - 'USB device attach do not support options') - - if device.frontend_domain: + if device.attachment: raise qubes.devices.DeviceAlreadyAttached( - 'Device {!s} already attached to {!s}'.format(device, - device.frontend_domain) + 'Device {!s} already attached to {!s}'.format( + device, device.attachment) ) - stubdom_qrexec = (vm.virt_mode == 'hvm' and \ - vm.features.check_with_template('stubdom-qrexec', False)) + stubdom_qrexec = ( + vm.virt_mode == 'hvm' + and vm.features.check_with_template('stubdom-qrexec', False)) name = vm.name + '-dm' if stubdom_qrexec else vm.name @@ -366,7 +658,7 @@ async def on_device_detach_usb(self, vm, event, device): if not isinstance(device, USBDevice): return - connected_to = device.frontend_domain + connected_to = device.attachment # detect race conditions; there is still race here, but much smaller if connected_to is None or connected_to.qid != vm.qid: raise QubesUSBException( @@ -389,9 +681,9 @@ async def on_device_detach_usb(self, vm, event, device): @qubes.ext.handler('domain-start') async def on_domain_start(self, vm, _event, **_kwargs): # pylint: disable=unused-argument - for assignment in vm.devices['usb'].assignments(persistent=True): - device = assignment.device - await self.on_device_attach_usb(vm, '', device, options={}) + for assignment in get_assigned_devices(vm.devices['usb']): + await self.attach_and_notify( + vm, assignment.device, assignment.options) @qubes.ext.handler('domain-shutdown') async def on_domain_shutdown(self, vm, _event, **_kwargs): diff --git a/qubesusbproxy/tests.py b/qubesusbproxy/tests.py index f9ccb12..79edf44 100644 --- a/qubesusbproxy/tests.py +++ b/qubesusbproxy/tests.py @@ -27,16 +27,42 @@ core2 = False core3 = False +legacy = False try: import qubesusbproxy.core3ext - import qubes.devices import asyncio + + try: + from qubes.device_protocol import DeviceAssignment + + def assign(test, collection, assignment): + test.loop.run_until_complete(collection.assign(assignment)) + + def unassign(test, collection, assignment): + test.loop.run_until_complete(collection.unassign(assignment)) + + AUTO_ATTACH = {"attach_automatically": True, "required": True} + except ImportError: + # This extension supports both the legacy and new device API. + # In the case of the legacy backend, functionality is limited. + from qubes.devices import DeviceAssignment + + def assign(test, collection, assignment): + test.loop.run_until_complete(collection.attach(assignment)) + + def unassign(test, collection, assignment): + test.loop.run_until_complete(collection.detach(assignment)) + + legacy = True + AUTO_ATTACH = {"persistent": True} + core3 = True except ImportError: pass try: import qubes.qubesutils + core2 = True except ImportError: pass @@ -49,7 +75,6 @@ except FileNotFoundError: pass - GADGET_PREREQ = '&&'.join([ "modprobe dummy_hcd", "modprobe usb_f_mass_storage", @@ -76,15 +101,16 @@ "sleep 2; udevadm settle", ]) + def create_usb_gadget(vm): vm.start() p = vm.run(GADGET_PREREQ, user="root", - passio_popen=True, passio_stderr=True) - (_, stderr) = p.communicate() + passio_popen=True, passio_stderr=True) + (_, _stderr) = p.communicate() if p.returncode != 0: raise unittest.SkipTest("missing USB Gadget subsystem") p = vm.run(GADGET_PREPARE, user="root", - passio_popen=True, passio_stderr=True) + passio_popen=True, passio_stderr=True) (_, stderr) = p.communicate() if p.returncode != 0: raise RuntimeError("Failed to setup USB gadget: " + stderr.decode()) @@ -97,14 +123,16 @@ def create_usb_gadget(vm): raise RuntimeError("Failed to get dummy device ID") return stdout + def remove_usb_gadget(vm): assert vm.is_running() retcode = vm.run("echo > /sys/kernel/config/usb_gadget/test_g1/UDC", - user="root", wait=True) + user="root", wait=True) if retcode != 0: raise RuntimeError("Failed to disable USB gadget") + def recreate_usb_gadget(vm): '''Re-create the gadget previously created with *create_usb_gadget*, then removed with *remove_usb_gadget*. @@ -115,11 +143,10 @@ def recreate_usb_gadget(vm): "mkdir test_g1; cd test_g1", "echo dummy_udc.0 > UDC", "sleep 2; udevadm settle", - ]) - + ]) p = vm.run(reconnect, user="root", - passio_popen=True, passio_stderr=True) + passio_popen=True, passio_stderr=True) (_, stderr) = p.communicate() if p.returncode != 0: raise RuntimeError("Failed to re-create USB gadget: " + stderr.decode()) @@ -139,53 +166,64 @@ def test_000_attach_detach(self): self.frontend.start() # TODO: check qubesdb entries self.assertEqual(self.frontend.run_service('qubes.USBAttach', - user='root', - input="{} {}\n".format(self.backend.name, self.dummy_usb_dev)), 0, - "qubes.USBAttach call failed") + user='root', + input="{} {}\n".format( + self.backend.name, + self.dummy_usb_dev)), 0, + "qubes.USBAttach call failed") self.assertEqual(self.frontend.run('lsusb -d 1234:1234', - wait=True), 0, - "Device connection failed") + wait=True), 0, + "Device connection failed") # TODO: check qubesdb entries self.assertEqual(self.frontend.run_service('qubes.USBDetach', - user='root', - input="{} {}\n".format(self.backend.name, self.dummy_usb_dev)), 0, - "qubes.USBDetach call failed") + user='root', + input="{} {}\n".format( + self.backend.name, + self.dummy_usb_dev)), 0, + "qubes.USBDetach call failed") self.assertEqual(self.frontend.run('lsusb -d 1234:1234', wait=True), 1, - "Device disconnection failed") + "Device disconnection failed") def test_010_attach_detach_vid_pid(self): self.frontend.start() # TODO: check qubesdb entries self.assertEqual(self.frontend.run_service('qubes.USBAttach', - user='root', - input="{} {}\n".format(self.backend.name, "0x1234.0x1234")), 0, - "qubes.USBAttach call failed") + user='root', + input="{} {}\n".format( + self.backend.name, + "0x1234.0x1234")), 0, + "qubes.USBAttach call failed") self.assertEqual(self.frontend.run('lsusb -d 1234:1234', wait=True), 0, - "Device connection failed") + "Device connection failed") # TODO: check qubesdb entries self.assertEqual(self.frontend.run_service('qubes.USBDetach', - user='root', - input="{} {}\n".format(self.backend.name, "0x1234.0x1234")), 0, - "qubes.USBDetach call failed") + user='root', + input="{} {}\n".format( + self.backend.name, + "0x1234.0x1234")), 0, + "qubes.USBDetach call failed") self.assertEqual(self.frontend.run('lsusb -d 1234:1234', wait=True), 1, - "Device disconnection failed") + "Device disconnection failed") def test_020_detach_on_remove(self): self.frontend.start() self.assertEqual(self.frontend.run_service('qubes.USBAttach', - user='root', - input="{} {}\n".format(self.backend.name, self.dummy_usb_dev)), 0, - "qubes.USBAttach call failed") + user='root', + input="{} {}\n".format( + self.backend.name, + self.dummy_usb_dev)), 0, + "qubes.USBAttach call failed") self.assertEqual(self.frontend.run('lsusb -d 1234:1234', - wait=True), 0, - "Device connection failed") + wait=True), 0, + "Device connection failed") remove_usb_gadget(self.backend) # FIXME: usb-export script may update qubesdb/disconnect with 1sec delay time.sleep(2) self.assertEqual(self.frontend.run('lsusb -d 1234:1234', wait=True), 1, - "Device not cleaned up") + "Device not cleaned up") # TODO: check for kernel errors? + class TC_10_USBProxy_core2(qubes.tests.extra.ExtraTestCase): def setUp(self): super(TC_10_USBProxy_core2, self).setUp() @@ -208,29 +246,30 @@ def test_020_attach(self): usb_list = qubes.qubesutils.usb_list(self.qc, vm=self.backend) try: qubes.qubesutils.usb_attach(self.qc, - self.frontend, usb_list[self.usbdev_name]) + self.frontend, + usb_list[self.usbdev_name]) except qubes.qubesutils.USBProxyNotInstalled as e: self.skipTest(str(e)) self.assertEqual(self.frontend.run('lsusb -d 1234:1234', - wait=True), 0, - "Device connection failed") + wait=True), 0, + "Device connection failed") usb_list = qubes.qubesutils.usb_list(self.qc, vm=self.backend) self.assertEquals(usb_list[self.usbdev_name]['connected-to'], - self.frontend) + self.frontend) def test_030_detach(self): self.frontend.start() usb_list = qubes.qubesutils.usb_list(self.qc, vm=self.backend) try: qubes.qubesutils.usb_attach(self.qc, self.frontend, - usb_list[self.usbdev_name]) + usb_list[self.usbdev_name]) except qubes.qubesutils.USBProxyNotInstalled as e: self.skipTest(str(e)) qubes.qubesutils.usb_detach(self.qc, self.frontend, - usb_list[self.usbdev_name]) + usb_list[self.usbdev_name]) # FIXME: usb-export script may update qubesdb with 1sec delay time.sleep(2) @@ -238,15 +277,15 @@ def test_030_detach(self): self.assertIsNone(usb_list[self.usbdev_name]['connected-to']) self.assertNotEqual(self.frontend.run('lsusb -d 1234:1234', - wait=True), 0, - "Device disconnection failed") + wait=True), 0, + "Device disconnection failed") def test_040_detach_all(self): self.frontend.start() usb_list = qubes.qubesutils.usb_list(self.qc, vm=self.backend) try: qubes.qubesutils.usb_attach(self.qc, self.frontend, - usb_list[self.usbdev_name]) + usb_list[self.usbdev_name]) except qubes.qubesutils.USBProxyNotInstalled as e: self.skipTest(str(e)) @@ -258,8 +297,8 @@ def test_040_detach_all(self): self.assertIsNone(usb_list[self.usbdev_name]['connected-to']) self.assertNotEqual(self.frontend.run('lsusb -d 1234:1234', - wait=True), 0, - "Device disconnection failed") + wait=True), 0, + "Device disconnection failed") def test_050_list_attached(self): """ Attached device should not be listed as further attachable """ @@ -267,24 +306,25 @@ def test_050_list_attached(self): usb_list = qubes.qubesutils.usb_list(self.qc, vm=self.backend) usb_list_front_pre = qubes.qubesutils.usb_list(self.qc, - vm=self.frontend) + vm=self.frontend) try: qubes.qubesutils.usb_attach(self.qc, - self.frontend, usb_list[self.usbdev_name]) + self.frontend, + usb_list[self.usbdev_name]) except qubes.qubesutils.USBProxyNotInstalled as e: self.skipTest(str(e)) self.assertEqual(self.frontend.run('lsusb -d 1234:1234', - wait=True), 0, - "Device connection failed") + wait=True), 0, + "Device connection failed") usb_list = qubes.qubesutils.usb_list(self.qc, vm=self.backend) self.assertEquals(usb_list[self.usbdev_name]['connected-to'], - self.frontend) + self.frontend) usb_list_front_post = qubes.qubesutils.usb_list(self.qc, - vm=self.frontend) + vm=self.frontend) self.assertEquals(usb_list_front_pre, usb_list_front_post) @@ -293,7 +333,7 @@ def test_060_auto_detach_on_remove(self): usb_list = qubes.qubesutils.usb_list(self.qc, vm=self.backend) try: qubes.qubesutils.usb_attach(self.qc, self.frontend, - usb_list[self.usbdev_name]) + usb_list[self.usbdev_name]) except qubes.qubesutils.USBProxyNotInstalled as e: self.skipTest(str(e)) @@ -304,33 +344,33 @@ def test_060_auto_detach_on_remove(self): usb_list = qubes.qubesutils.usb_list(self.qc, vm=self.backend) self.assertNotIn(self.usbdev_name, usb_list) self.assertNotEqual(self.frontend.run('lsusb -d 1234:1234', - wait=True), 0, - "Device disconnection failed") + wait=True), 0, + "Device disconnection failed") def test_070_attach_not_installed_front(self): self.frontend.start() # simulate package not installed retcode = self.frontend.run("rm -f /etc/qubes-rpc/qubes.USBAttach", - user="root", wait=True) + user="root", wait=True) if retcode != 0: raise RuntimeError("Failed to simulate not installed package") usb_list = qubes.qubesutils.usb_list(self.qc, vm=self.backend) with self.assertRaises(qubes.qubesutils.USBProxyNotInstalled): qubes.qubesutils.usb_attach(self.qc, self.frontend, - usb_list[self.usbdev_name]) + usb_list[self.usbdev_name]) @unittest.expectedFailure def test_075_attach_not_installed_back(self): self.frontend.start() # simulate package not installed retcode = self.backend.run("rm -f /etc/qubes-rpc/qubes.USB", - user="root", wait=True) + user="root", wait=True) if retcode != 0: raise RuntimeError("Failed to simulate not installed package") usb_list = qubes.qubesutils.usb_list(self.qc, vm=self.backend) try: qubes.qubesutils.usb_attach(self.qc, self.frontend, - usb_list[self.usbdev_name]) + usb_list[self.usbdev_name]) except qubes.qubesutils.USBProxyNotInstalled: pass except Exception as e: @@ -362,29 +402,27 @@ def test_000_list(self): usb_list = self.backend.devices['usb'] self.assertIn(self.usbdev_name, [str(dev) for dev in usb_list]) - def test_010_attach_offline(self): + def test_010_assign(self): usb_dev = self.backend.devices['usb'][self.usbdev_ident] - ass = qubes.devices.DeviceAssignment(self.backend, self.usbdev_ident, - persistent=True) - self.loop.run_until_complete( - self.frontend.devices['usb'].attach(ass)) - self.assertIsNone(usb_dev.frontend_domain) + ass = DeviceAssignment( + self.backend, self.usbdev_ident, **AUTO_ATTACH) + assign(self, self.frontend.devices['usb'], ass) + self.assertIsNone(usb_dev.attachment) try: self.frontend.start() except qubesusbproxy.core3ext.USBProxyNotInstalled as e: self.skipTest(str(e)) self.assertEqual(self.frontend.run('lsusb -d 1234:1234', - wait=True), 0, - "Device connection failed") + wait=True), 0, + "Device connection failed") - self.assertEquals(usb_dev.frontend_domain, - self.frontend) + self.assertEqual(usb_dev.attachment, self.frontend) def test_020_attach(self): self.frontend.start() usb_dev = self.backend.devices['usb'][self.usbdev_ident] - ass = qubes.devices.DeviceAssignment(self.backend, self.usbdev_ident) + ass = DeviceAssignment(self.backend, self.usbdev_ident) try: self.loop.run_until_complete( self.frontend.devices['usb'].attach(ass)) @@ -392,16 +430,15 @@ def test_020_attach(self): self.skipTest(str(e)) self.assertEqual(self.frontend.run('lsusb -d 1234:1234', - wait=True), 0, - "Device connection failed") + wait=True), 0, + "Device connection failed") - self.assertEquals(usb_dev.frontend_domain, - self.frontend) + self.assertEquals(usb_dev.attachment, self.frontend) def test_030_detach(self): self.frontend.start() usb_dev = self.backend.devices['usb'][self.usbdev_ident] - ass = qubes.devices.DeviceAssignment(self.backend, self.usbdev_ident) + ass = DeviceAssignment(self.backend, self.usbdev_ident) try: self.loop.run_until_complete( self.frontend.devices['usb'].attach(ass)) @@ -413,22 +450,20 @@ def test_030_detach(self): # FIXME: usb-export script may update qubesdb with 1sec delay self.loop.run_until_complete(asyncio.sleep(2)) - self.assertIsNone(usb_dev.frontend_domain) + self.assertIsNone(usb_dev.attachment) self.assertNotEqual(self.frontend.run('lsusb -d 1234:1234', - wait=True), 0, - "Device disconnection failed") + wait=True), 0, + "Device disconnection failed") - def test_040_detach_offline(self): + def test_040_unassign(self): usb_dev = self.backend.devices['usb'][self.usbdev_ident] - ass = qubes.devices.DeviceAssignment(self.backend, self.usbdev_ident, - persistent=True) - self.loop.run_until_complete( - self.frontend.devices['usb'].attach(ass)) - self.assertIsNone(usb_dev.frontend_domain) - self.loop.run_until_complete( - self.frontend.devices['usb'].detach(ass)) - self.assertIsNone(usb_dev.frontend_domain) + ass = DeviceAssignment( + self.backend, self.usbdev_ident, **AUTO_ATTACH) + assign(self, self.frontend.devices['usb'], ass) + self.assertIsNone(usb_dev.attachment) + unassign(self, self.frontend.devices['usb'], ass) + self.assertIsNone(usb_dev.attachment) def test_050_list_attached(self): """ Attached device should not be listed as further attachable """ @@ -436,7 +471,7 @@ def test_050_list_attached(self): usb_list = self.backend.devices['usb'] usb_list_front_pre = list(self.frontend.devices['usb']) - ass = qubes.devices.DeviceAssignment(self.backend, self.usbdev_ident) + ass = DeviceAssignment(self.backend, self.usbdev_ident) try: self.loop.run_until_complete( @@ -445,11 +480,10 @@ def test_050_list_attached(self): self.skipTest(str(e)) self.assertEqual(self.frontend.run('lsusb -d 1234:1234', - wait=True), 0, - "Device connection failed") + wait=True), 0, + "Device connection failed") - self.assertEquals(usb_list[self.usbdev_ident].frontend_domain, - self.frontend) + self.assertEquals(usb_list[self.usbdev_ident].attachment, self.frontend) usb_list_front_post = list(self.frontend.devices['usb']) @@ -458,7 +492,7 @@ def test_050_list_attached(self): def test_060_auto_detach_on_remove(self): self.frontend.start() usb_list = self.backend.devices['usb'] - ass = qubes.devices.DeviceAssignment(self.backend, self.usbdev_ident) + ass = DeviceAssignment(self.backend, self.usbdev_ident) try: self.loop.run_until_complete( self.frontend.devices['usb'].attach(ass)) @@ -471,17 +505,16 @@ def test_060_auto_detach_on_remove(self): self.assertNotIn(self.usbdev_name, [str(dev) for dev in usb_list]) self.assertNotEqual(self.frontend.run('lsusb -d 1234:1234', - wait=True), 0, - "Device disconnection failed") + wait=True), 0, + "Device disconnection failed") def test_061_auto_attach_on_reconnect(self): self.frontend.start() usb_list = self.backend.devices['usb'] - ass = qubes.devices.DeviceAssignment(self.backend, self.usbdev_ident, - persistent=True) + ass = DeviceAssignment( + self.backend, self.usbdev_ident, **AUTO_ATTACH) try: - self.loop.run_until_complete( - self.frontend.devices['usb'].attach(ass)) + assign(self, self.frontend.devices['usb'], ass) except qubesusbproxy.core3ext.USBProxyNotInstalled as e: self.skipTest(str(e)) @@ -501,17 +534,17 @@ def test_061_auto_attach_on_reconnect(self): self.assertGreater(timeout, 0, 'timeout on device create') self.loop.run_until_complete(asyncio.sleep(5)) self.assertEqual(self.frontend.run('lsusb -d 1234:1234', - wait=True), 0, - "Device reconnection failed") + wait=True), 0, + "Device reconnection failed") def test_070_attach_not_installed_front(self): self.frontend.start() # simulate package not installed retcode = self.frontend.run("rm -f /etc/qubes-rpc/qubes.USBAttach", - user="root", wait=True) + user="root", wait=True) if retcode != 0: raise RuntimeError("Failed to simulate not installed package") - ass = qubes.devices.DeviceAssignment(self.backend, self.usbdev_ident) + ass = DeviceAssignment(self.backend, self.usbdev_ident) with self.assertRaises(qubesusbproxy.core3ext.USBProxyNotInstalled): self.loop.run_until_complete( self.frontend.devices['usb'].attach(ass)) @@ -521,10 +554,10 @@ def test_075_attach_not_installed_back(self): self.frontend.start() # simulate package not installed retcode = self.backend.run("rm -f /etc/qubes-rpc/qubes.USB", - user="root", wait=True) + user="root", wait=True) if retcode != 0: raise RuntimeError("Failed to simulate not installed package") - ass = qubes.devices.DeviceAssignment(self.backend, self.usbdev_ident) + ass = DeviceAssignment(self.backend, self.usbdev_ident) try: with self.assertRaises(qubesusbproxy.core3ext.USBProxyNotInstalled): self.loop.run_until_complete( @@ -536,10 +569,12 @@ def test_075_attach_not_installed_back(self): def test_080_attach_existing_policy(self): self.frontend.start() # this override policy file, but during normal execution it shouldn't - # exist, so should be ok, especially on testing system - with open('/etc/qubes-rpc/policy/qubes.USB+{}'.format(self.usbdev_ident), 'w+') as policy_file: + # exist, so should be ok, especially on a testing system + with open( + '/etc/qubes-rpc/policy/qubes.USB+{}'.format(self.usbdev_ident), + 'w+') as policy_file: policy_file.write('# empty policy\n') - ass = qubes.devices.DeviceAssignment(self.backend, self.usbdev_ident) + ass = DeviceAssignment(self.backend, self.usbdev_ident) self.loop.run_until_complete( self.frontend.devices['usb'].attach(ass)) @@ -548,8 +583,7 @@ def test_090_attach_stubdom(self): self.frontend.virt_mode = 'hvm' self.frontend.features['stubdom-qrexec'] = True self.frontend.start() - usb_dev = self.backend.devices['usb'][self.usbdev_ident] - ass = qubes.devices.DeviceAssignment(self.backend, self.usbdev_ident) + ass = DeviceAssignment(self.backend, self.usbdev_ident) try: self.loop.run_until_complete( self.frontend.devices['usb'].attach(ass)) @@ -558,8 +592,9 @@ def test_090_attach_stubdom(self): time.sleep(5) self.assertEqual(self.frontend.run('lsusb -d 1234:1234', - wait=True), 0, - "Device connection failed") + wait=True), 0, + "Device connection failed") + def list_tests(): tests = [TC_00_USBProxy] diff --git a/qubesusbproxy/utils.py b/qubesusbproxy/utils.py new file mode 100644 index 0000000..da49b4f --- /dev/null +++ b/qubesusbproxy/utils.py @@ -0,0 +1,103 @@ +# coding=utf-8 +# +# The Qubes OS Project, https://www.qubes-os.org +# +# Copyright (C) 2023 Piotr Bartman-Szwarc +# +# This program is free software; you can redistribute it and/or +# modify it under the terms of the GNU General Public License +# as published by the Free Software Foundation; either version 2 +# of the License, or (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with this program; if not, write to the Free Software +# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, +# USA. +import asyncio + +import qubes + + +def device_list_change( + ext: qubes.ext.Extension, current_devices, + vm, path, device_class +): + devclass = device_class.__name__[:-len('Device')].lower() + + if path is not None: + vm.fire_event(f'device-list-change:{devclass}') + + added, attached, detached, removed = ( + compare_device_cache(vm, ext.devices_cache, current_devices)) + + # send events about devices detached/attached outside by themselves + for dev_id, front_vm in detached.items(): + dev = device_class(vm, dev_id) + asyncio.ensure_future(front_vm.fire_event_async( + f'device-detach:{devclass}', device=dev)) + for dev_id in removed: + device = device_class(vm, dev_id) + vm.fire_event(f'device-removed:{devclass}', device=device) + for dev_id in added: + device = device_class(vm, dev_id) + vm.fire_event(f'device-added:{devclass}', device=device) + for dev_ident, front_vm in attached.items(): + dev = device_class(vm, dev_ident) + # options are unknown, device already attached + asyncio.ensure_future(front_vm.fire_event_async( + f'device-attach:{devclass}', device=dev, options={})) + + ext.devices_cache[vm.name] = current_devices + + for front_vm in vm.app.domains: + if not front_vm.is_running(): + continue + for assignment in front_vm.devices[devclass].assignments( + persistent=True): + if (assignment.backend_domain == vm + and assignment.ident in added + and assignment.ident not in attached + ): + asyncio.ensure_future(ext.attach_and_notify( + front_vm, assignment.device, assignment.options)) + + +def compare_device_cache(vm, devices_cache, current_devices): + # compare cached devices and current devices, collect: + # - newly appeared devices (ident) + # - devices attached from a vm to frontend vm (ident: frontend_vm) + # - devices detached from frontend vm (ident: frontend_vm) + # - disappeared devices, e.g., plugged out (ident) + added = set() + attached = {} + detached = {} + removed = set() + cache = devices_cache[vm.name] + for dev_id, front_vm in current_devices.items(): + if dev_id not in cache: + added.add(dev_id) + if front_vm is not None: + attached[dev_id] = front_vm + elif cache[dev_id] != front_vm: + cached_front = cache[dev_id] + if front_vm is None: + detached[dev_id] = cached_front + elif cached_front is None: + attached[dev_id] = front_vm + else: + # a front changed from one to another, so we signal it as: + # detach from the first one and attach to the second one. + detached[dev_id] = cached_front + attached[dev_id] = front_vm + + for dev_id, cached_front in cache.items(): + if dev_id not in current_devices: + removed.add(dev_id) + if cached_front is not None: + detached[dev_id] = cached_front + return added, attached, detached, removed