Skip to content

Commit

Permalink
q-dev: keep partial backward compatibility
Browse files Browse the repository at this point in the history
  • Loading branch information
piotrbartman committed Jun 2, 2024
1 parent f6ad1c3 commit 5cb2a8f
Show file tree
Hide file tree
Showing 2 changed files with 164 additions and 20 deletions.
82 changes: 62 additions & 20 deletions qubesusbproxy/core3ext.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@
# with this program; if not, write to the Free Software Foundation, Inc.,
# 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.

import asyncio
import collections
import fcntl
import grp
Expand All @@ -35,11 +34,52 @@
import tempfile
from typing import List, Optional, Dict, Tuple

import qubes.device_protocol
try:
from qubes.device_protocol import DeviceInfo
from qubes.device_protocol import DeviceInterface
from qubes.ext import utils
from qubes.devices import UnrecognizedDevice
except ImportError:
# This extension supports both the legacy and new device API.
# In the case of the legacy backend, functionality is limited.
from qubes.devices import DeviceInfo as LegacyDeviceInfo
import qubesusbproxy.utils

class DescriptionOverrider:
@property
def description(self):
return self.vendor + " " + self.product

class DeviceInfo(DescriptionOverrider, LegacyDeviceInfo):
def __init__(self, *args, **kwargs):
# not supported options in legacy code
del kwargs['devclass']
kwargs['description'] = 'foo'
self.safe_chars = self.safe_chars.replace(' ', '')
super().__init__(*args, **kwargs)

# needed but not in legacy DeviceInfo
self._vendor = None
self._product = None
self._manufacturer = None
self._name = None
self._serial = None
# `_load_interfaces_from_qubesdb` will never be called
self._interfaces = "?******"

@property
def fronted_domain(self):
return self.attachment

class DeviceInterface:
pass

class UnrecognizedDevice(ValueError):
pass

import qubes.devices
import qubes.ext
import qubes.vm.adminvm
from qubes.ext import utils

usb_device_re = re.compile(r"^[0-9]+-[0-9]+(_[0-9]+)*$")
# should match valid VM name
Expand All @@ -49,9 +89,12 @@
HWDATA_PATH = '/usr/share/hwdata'


class USBDevice(qubes.device_protocol.DeviceInfo):
class USBDevice(DeviceInfo):
# pylint: disable=too-few-public-methods
def __init__(self, backend_domain, ident):
# the superclass can restrict the allowed characters
self.safe_chars = (string.ascii_letters + string.digits
+ string.punctuation + ' ')
super(USBDevice, self).__init__(
backend_domain=backend_domain, ident=ident, devclass="usb")

Expand Down Expand Up @@ -136,7 +179,7 @@ def serial(self) -> str:
return result

@property
def interfaces(self) -> List[qubes.device_protocol.DeviceInterface]:
def interfaces(self) -> List[DeviceInterface]:
"""
List of device interfaces.
Expand All @@ -149,17 +192,16 @@ def interfaces(self) -> List[qubes.device_protocol.DeviceInterface]:
return result

@property
def parent_device(self) -> Optional[qubes.device_protocol.DeviceInfo]:
def parent_device(self) -> Optional[DeviceInfo]:
"""
The parent device, if any.
A USB device has no parents.
"""
return None

def _load_interfaces_from_qubesdb(self) \
-> List[qubes.device_protocol.DeviceInterface]:
result = [qubes.device_protocol.DeviceInterface.unknown()]
def _load_interfaces_from_qubesdb(self) -> List[DeviceInterface]:
result = [DeviceInterface.unknown()]
if not self.backend_domain.is_running():
# don't cache this value
return result
Expand All @@ -170,7 +212,7 @@ def _load_interfaces_from_qubesdb(self) \
if not untrusted_interfaces:
return result
self._interfaces = result = [
qubes.device_protocol.DeviceInterface(
DeviceInterface(
self._sanitize(ifc, safe_chars=string.hexdigits), devclass="usb"
)
for ifc in untrusted_interfaces.split(b':')
Expand Down Expand Up @@ -219,21 +261,21 @@ def _load_desc_from_qubesdb(self) -> Dict[str, str]:
untrusted_product_id)
vendor, product = self._get_vendor_and_product_names(
self._vendor_id, self._product_id)
self._vendor = result["vendor"] = vendor
self._product = result["product"] = product
self._vendor = result["vendor"] = self._sanitize(vendor.encode())
self._product = result["product"] = self._sanitize(product.encode())
self._manufacturer = result["manufacturer"] = (
self._sanitize(untrusted_manufacturer))
self._name = result["name"] = (self._sanitize(untrusted_name))
self._name = result["serial"] = (self._sanitize(untrusted_serial))
self._name = result["name"] = self._sanitize(untrusted_name)
self._name = result["serial"] = self._sanitize(untrusted_serial)
return result

@staticmethod
def _sanitize(
untrusted_device_desc: bytes,
safe_chars: str =
string.ascii_letters + string.digits + string.punctuation + ' '
self, untrusted_device_desc: bytes,
safe_chars: Optional[str] = None
) -> str:
# rb'USB\x202.0\x20Camera' -> 'USB 2.0 Camera'
if safe_chars is None:
safe_chars = self.safe_chars
untrusted_device_desc = untrusted_device_desc.decode(
'unicode_escape', errors='ignore')
safe_chars_set = set(safe_chars)
Expand Down Expand Up @@ -434,7 +476,7 @@ async def attach_and_notify(self, vm, device, options):
try:
await self.on_device_attach_usb(
vm, 'device-pre-attach:usb', device, options)
except qubes.devices.UnrecognizedDevice:
except UnrecognizedDevice:
return
await vm.fire_event_async(
'device-attach:usb', device=device, options=options)
Expand Down Expand Up @@ -513,7 +555,7 @@ async def on_device_attach_usb(self, vm, event, device, options):
if identity != 'any' and device.self_identity != identity:
print(f"Unrecognized identity, skipping attachment of {device}",
file=sys.stderr)
raise qubes.devices.UnrecognizedDevice(
raise UnrecognizedDevice(
"Device presented identity "
f"{device.self_identity} "
f"does not match expected {identity}"
Expand Down
102 changes: 102 additions & 0 deletions qubesusbproxy/utils.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,102 @@
# coding=utf-8
#
# The Qubes OS Project, https://www.qubes-os.org
#
# Copyright (C) 2023 Piotr Bartman-Szwarc <prbartman@invisiblethingslab.com>
#
# This program is free software; you can redistribute it and/or
# modify it under the terms of the GNU General Public License
# as published by the Free Software Foundation; either version 2
# of the License, or (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program; if not, write to the Free Software
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301,
# USA.
import asyncio

import qubes


def device_list_change(
ext: qubes.ext.Extension, current_devices,
vm, path, device_class
):
devclass = device_class.__name__[:-len('Device')].lower()

if path is not None:
vm.fire_event(f'device-list-change:{devclass}')

added, attached, detached, removed = (
compare_device_cache(vm, ext.devices_cache, current_devices))

# send events about devices detached/attached outside by themselves
for dev_id, front_vm in detached.items():
dev = device_class(vm, dev_id)
asyncio.ensure_future(front_vm.fire_event_async(
f'device-detach:{devclass}', device=dev))
for dev_id in removed:
device = device_class(vm, dev_id)
vm.fire_event(f'device-removed:{devclass}', device=device)
for dev_id in added:
device = device_class(vm, dev_id)
vm.fire_event(f'device-added:{devclass}', device=device)
for dev_ident, front_vm in attached.items():
dev = device_class(vm, dev_ident)
# options are unknown, device already attached
asyncio.ensure_future(front_vm.fire_event_async(
f'device-attach:{devclass}', device=dev, options={}))

ext.devices_cache[vm.name] = current_devices

for front_vm in vm.app.domains:
if not front_vm.is_running():
continue
for assignment in front_vm.devices[devclass].get_assigned_devices():
if (assignment.backend_domain == vm
and assignment.ident in added
and assignment.ident not in attached
):
asyncio.ensure_future(ext.attach_and_notify(
front_vm, assignment.device, assignment.options))


def compare_device_cache(vm, devices_cache, current_devices):
# compare cached devices and current devices, collect:
# - newly appeared devices (ident)
# - devices attached from a vm to frontend vm (ident: frontend_vm)
# - devices detached from frontend vm (ident: frontend_vm)
# - disappeared devices, e.g., plugged out (ident)
added = set()
attached = {}
detached = {}
removed = set()
cache = devices_cache[vm.name]
for dev_id, front_vm in current_devices.items():
if dev_id not in cache:
added.add(dev_id)
if front_vm is not None:
attached[dev_id] = front_vm
elif cache[dev_id] != front_vm:
cached_front = cache[dev_id]
if front_vm is None:
detached[dev_id] = cached_front
elif cached_front is None:
attached[dev_id] = front_vm
else:
# a front changed from one to another, so we signal it as:
# detach from the first one and attach to the second one.
detached[dev_id] = cached_front
attached[dev_id] = front_vm

for dev_id, cached_front in cache.items():
if dev_id not in current_devices:
removed.add(dev_id)
if cached_front is not None:
detached[dev_id] = cached_front
return added, attached, detached, removed

0 comments on commit 5cb2a8f

Please sign in to comment.