Skip to content

Commit

Permalink
q-dev: DeviceInterface
Browse files Browse the repository at this point in the history
  • Loading branch information
piotrbartman committed May 26, 2024
1 parent 141ca73 commit e5a4420
Showing 1 changed file with 153 additions and 48 deletions.
201 changes: 153 additions & 48 deletions qubesadmin/devices.py
Original file line number Diff line number Diff line change
Expand Up @@ -120,48 +120,147 @@ def devclass(self, devclass: str):
self.__bus = devclass


class DeviceInterface(Enum):
# USB interfaces:
# https://www.usb.org/defined-class-codes#anchor_BaseClass03h
Other = "******"
USB_Audio = "01****"
USB_CDC = "02****" # Communications Device Class
USB_HID = "03****"
USB_HID_Keyboard = "03**01"
USB_HID_Mouse = "03**02"
# USB_Physical = "05****"
# USB_Still_Imaging = "06****" # Camera
USB_Printer = "07****"
USB_Mass_Storage = "08****"
USB_Hub = "09****"
USB_CDC_Data = "0a****"
USB_Smart_Card = "0b****"
# USB_Content_Security = "0d****"
USB_Video = "0e****" # Video Camera
# USB_Personal_Healthcare = "0f****"
USB_Audio_Video = "10****"
# USB_Billboard = "11****"
# USB_C_Bridge = "12****"
# and more...
class DeviceCategory(Enum):
"""
Arbitrarily selected interfaces that are important to users,
thus deserving special recognition such as a custom icon, etc.
"""
Other = "*******"

Communication = ("u02****", "p07****") # eg. modems
Input = ("u03****", "p09****") # HID etc.
Keyboard = ("u03**01", "p0900**")
Mouse = ("u03**02", "p0902**")
Printer = ("u07****",)
Scanner = ("p0903**",)
# Multimedia = Audio, Video, Displays etc.
Multimedia = ("u01****", "u0e****", "u06****", "u10****", "p03****",
"p04****")
Wireless = ("ue0****", "p0d****")
Bluetooth = ("ue00101", "p0d11**")
Mass_Data = ("b******", "u08****", "p01****")
Network = ("p02****",)
Memory = ("p05****",)
PCI_Bridge = ("p06****",)
Docking_Station = ("p0a****",)
Processor = ("p0b****", "p40****")
PCI_Serial_Bus = ("p0c****",)
PCI_USB = ("p0c03**",)

@staticmethod
def from_str(interface_encoding: str) -> 'DeviceInterface':
result = DeviceInterface.Other
def from_str(interface_encoding: str) -> 'DeviceCategory':
result = DeviceCategory.Other
if len(interface_encoding) != len(DeviceCategory.Other.value):
return result
best_score = 0

for interface in DeviceInterface:
pattern = interface.value
score = 0
for t, p in zip(interface_encoding, pattern):
if t == p:
score += 1
elif p != "*":
score = -1 # inconsistent with pattern
break
for interface in DeviceCategory:
for pattern in interface.value:
score = 0
for t, p in zip(interface_encoding, pattern):
if t == p:
score += 1
elif p != "*":
score = -1 # inconsistent with pattern
break

if score > best_score:
best_score = score
result = interface

return result


class DeviceInterface:
def __init__(self, interface_encoding: str, devclass: Optional[str] = None):
ifc_padded = interface_encoding.ljust(6, '*')
if devclass:
if len(ifc_padded) > 6:
print(
f"interface_encoding is too long "
f"(is {len(interface_encoding)}, expected max. 6) "
f"for given {devclass=}",
file=sys.stderr
)
ifc_full = devclass[0] + ifc_padded
else:
known_devclasses = {'p': 'pci', 'u': 'usb', 'b': 'block'}
devclass = known_devclasses.get(interface_encoding[0], None)
if len(ifc_padded) > 7:
print(
f"interface_encoding is too long "
f"(is {len(interface_encoding)}, expected max. 7)",
file=sys.stderr
)
ifc_full = ifc_padded
elif len(ifc_padded) == 6:
ifc_full = ' ' + ifc_padded
else:
ifc_full = ifc_padded

self._devclass = devclass
self._interface_encoding = ifc_full
self._category = DeviceCategory.from_str(self._interface_encoding)

@property
def devclass(self) -> Optional[str]:
""" Immutable Device class such like: 'usb', 'pci' etc. """
return self._devclass

@property
def category(self) -> DeviceCategory:
""" Immutable Device category such like: 'Mouse', 'Mass_Data' etc. """
return self._category

@classmethod
def unknown(cls) -> 'DeviceInterface':
""" Value for unknown device interface. """
return cls(" ******")

@property
def __repr__(self):
return self._interface_encoding

@property
def __str__(self):
if self.devclass == "block":
return "Block device"
if self.devclass in ("usb", "pci"):
self._load_classes(self.devclass).get(
self._interface_encoding[1:],
f"Unclassified {self.devclass} device")
return repr(self)

if score > best_score:
best_score = score
result = interface
@staticmethod
def _load_classes(bus: str):
"""
List of known device classes, subclasses and programming interfaces.
"""
# Syntax:
# C class class_name
# subclass subclass_name <-- single tab
# prog-if prog-if_name <-- two tabs
result = {}
with open(f'/usr/share/hwdata/{bus}.ids',
encoding='utf-8', errors='ignore') as pciids:
class_id = None
subclass_id = None
for line in pciids.readlines():
line = line.rstrip()
if line.startswith('\t\t') and class_id and subclass_id:
(progif_id, _, progif_name) = line[2:].split(' ', 2)
result[class_id + subclass_id + progif_id] = \
f"{class_name}: {subclass_name} ({progif_name})"
elif line.startswith('\t') and class_id:
(subclass_id, _, subclass_name) = line[1:].split(' ', 2)
# store both prog-if specific entry and generic one
result[class_id + subclass_id + '**'] = \
f"{class_name}: {subclass_name}"
elif line.startswith('C '):
(_, class_id, _, class_name) = line.split(' ', 3)
result[class_id + '****'] = class_name
subclass_id = None

return result

Expand Down Expand Up @@ -277,8 +376,8 @@ def description(self) -> str:
prod = self.name
elif self.serial and self.serial != "unknown":
prod = self.serial
elif self._parent is not None:
return f"sub-device of {self._parent}"
elif self.parent_device is not None:
return f"sub-device of {self.parent_device}"
else:
prod = f"unknown {self.devclass if self.devclass else ''} device"

Expand All @@ -299,7 +398,7 @@ def interfaces(self) -> List[DeviceInterface]:
Every device should have at least one interface.
"""
if not self._interfaces:
return [DeviceInterface.Other]
return [DeviceInterface.unknown()]
return self._interfaces

@property
Expand Down Expand Up @@ -357,7 +456,7 @@ def serialize(self) -> bytes:
backend_domain_name.encode('ascii'))
properties += b' ' + base64.b64encode(backend_domain_prop)

interfaces = ''.join(ifc.value for ifc in self.interfaces)
interfaces = ''.join(repr(ifc) for ifc in self.interfaces)
interfaces_prop = b'interfaces=' + str(interfaces).encode('ascii')
properties += b' ' + base64.b64encode(interfaces_prop)

Expand Down Expand Up @@ -385,7 +484,7 @@ def deserialize(
result = DeviceInfo._deserialize(
cls, serialization, expected_backend_domain, expected_devclass)
except Exception as exc:
print(exc, file=sys.stderr) # TODO
print(exc, file=sys.stderr)
ident = serialization.split(b' ')[0].decode(
'ascii', errors='ignore')
result = UnknownDevice(
Expand Down Expand Up @@ -415,15 +514,18 @@ def _deserialize(
properties[key] = param

if properties['backend_domain'] != expected_backend_domain.name:
raise ValueError("TODO") # TODO
raise ValueError("Unexpected device backend domain: "
f"{properties['backend_domain']} != "
f"{expected_backend_domain.name}")
properties['backend_domain'] = expected_backend_domain
# if expected_devclass and properties['devclass'] != expected_devclass:
# raise ValueError("TODO") # TODO
if expected_devclass and properties['devclass'] != expected_devclass:
raise ValueError("Unexpected device class: "
f"{properties['devclass']} != {expected_devclass}")

interfaces = properties['interfaces']
interfaces = [
DeviceInterface.from_str(interfaces[i:i + 6])
for i in range(0, len(interfaces), 6)]
DeviceInterface(interfaces[i:i + 7])
for i in range(0, len(interfaces), 7)]
properties['interfaces'] = interfaces

if 'parent' in properties:
Expand Down Expand Up @@ -451,10 +553,13 @@ class DeviceAssignment(Device):
""" Maps a device to a frontend_domain. """

def __init__(self, backend_domain, ident, options=None, persistent=False,
frontend_domain=None, devclass=None):
frontend_domain=None, devclass=None,
required=False, attach_automatically=False):
super().__init__(backend_domain, ident, devclass)
self.__options = options or {}
self.persistent = persistent
self.__required = required
self.__attach_automatically = attach_automatically
self.__frontend_domain = frontend_domain

def clone(self):
Expand Down

0 comments on commit e5a4420

Please sign in to comment.