Skip to content

Commit

Permalink
q-dev: update DeviceInfo API
Browse files Browse the repository at this point in the history
  • Loading branch information
piotrbartman committed May 26, 2024
1 parent 73d9197 commit a64ca39
Showing 1 changed file with 60 additions and 33 deletions.
93 changes: 60 additions & 33 deletions qubesadmin/devices.py
Original file line number Diff line number Diff line change
Expand Up @@ -172,7 +172,7 @@ class DeviceInfo(Device):
# pylint: disable=too-few-public-methods
def __init__(
self,
backend_domain: 'qubesadmin.vm.QubesVM', # TODO
backend_domain: 'qubes.vm.qubesvm.QubesVM', # TODO
ident: str,
devclass: Optional[str] = None,
vendor: Optional[str] = None,
Expand All @@ -181,17 +181,18 @@ def __init__(
name: Optional[str] = None,
serial: Optional[str] = None,
interfaces: Optional[List[DeviceInterface]] = None,
parent: Optional[Device] = None,
**kwargs
):
super().__init__(backend_domain, ident, devclass)

# it's always better to print "unknown" than "None" or empty string.
self._vendor = vendor or "unknown"
self._product = product or "unknown"
self._manufacturer = manufacturer or "unknown"
self._name = name or "unknown"
self._serial = serial or "unknown"
self._interfaces = interfaces or [DeviceInterface.Other]
self._vendor = vendor
self._product = product
self._manufacturer = manufacturer
self._name = name
self._serial = serial
self._interfaces = interfaces
self._parent = parent

self.data = kwargs

Expand All @@ -204,6 +205,8 @@ def vendor(self) -> str:
Override this method to return proper name from `/usr/share/hwdata/*`.
"""
if not self._vendor:
return "unknown"
return self._vendor

@property
Expand All @@ -215,6 +218,8 @@ def product(self) -> str:
Override this method to return proper name from `/usr/share/hwdata/*`.
"""
if not self._product:
return "unknown"
return self._product

@property
Expand All @@ -226,6 +231,8 @@ def manufacturer(self) -> str:
Override this method to return proper name directly from device itself.
"""
if not self._manufacturer:
return "unknown"
return self._manufacturer

@property
Expand All @@ -237,6 +244,8 @@ def name(self) -> str:
Override this method to return proper name directly from device itself.
"""
if not self._name:
return "unknown"
return self._name

@property
Expand All @@ -248,6 +257,8 @@ def serial(self) -> str:
Override this method to return proper name directly from device itself.
"""
if not self._serial:
return "unknown"
return self._serial

@property
Expand All @@ -266,6 +277,8 @@ def description(self) -> str:
prod = self.name
elif self.serial and self.serial != "unknown":
prod = self.serial
elif self.parent_device is not None:
return f"partition of {self.parent_device}"
else:
prod = f"unknown {self.devclass if self.devclass else ''} device"

Expand All @@ -285,6 +298,8 @@ def interfaces(self) -> List[DeviceInterface]:
Every device should have at least one interface.
"""
if not self._interfaces:
return [DeviceInterface.Other]
return self._interfaces

@property
Expand All @@ -295,7 +310,10 @@ def parent_device(self) -> Optional['DeviceInfo']:
If the device is part of another device (e.g. it's a single
partition of an usb stick), the parent device id should be here.
"""
return None
if self._parent is None:
return None
return self.backend_domain.devices.get(
self._parent.devclass, {}).get(self._parent.ident, None)

@property
def subdevices(self) -> List['DeviceInfo']:
Expand All @@ -306,7 +324,7 @@ def subdevices(self) -> List['DeviceInfo']:
the subdevices id should be here.
"""
return [dev for dev in self.backend_domain.devices[self.devclass]
if dev.parent_device == self.ident]
if dev.parent_device.ident == self.ident]

# @property
# def port_id(self) -> str:
Expand All @@ -326,36 +344,37 @@ def serialize(self) -> bytes:
"""
Serialize object to be transmitted via Qubes API.
"""
# 'backend_domain' and 'interfaces' are not string, so they need
# special treatment
# 'backend_domain', 'interfaces', 'data', 'parent_device'
# are not string, so they need special treatment
default_attrs = {
'ident', 'devclass', 'vendor', 'product', 'manufacturer', 'name',
'serial', }
# to_skip_attrs = {
# 'parent_device', 'description', 'regex', 'parent_device',
# 'attachments', 'subdevices', 'serialize', 'deserialize'}
# rest_attrs = set(
# attr for attr in dir(self)
# if not attr.startswith('_')).difference(
# default_attrs + to_skip_attrs)
'serial'}
properties = b' '.join(
base64.b64encode(f'{prop}={value!s}'.encode('ascii'))
for prop, value in itertools.chain(
((key, getattr(self, key)) for key in default_attrs),
# # keep description as the last one, according to API
# # specification
# (('description', self.description),)
))
for prop, value in (
(key, getattr(self, key)) for key in default_attrs)
)

backend_domain_name = self.backend_domain.name
backend_domain_prop = (b'backend_domain=' +
backend_domain_name.encode('ascii'))
properties += b' ' + base64.b64encode(backend_domain_prop)

interfaces = ''.join(ifc.value for ifc in self._interfaces)
interfaces = ''.join(ifc.value for ifc in self.interfaces)
interfaces_prop = b'interfaces=' + str(interfaces).encode('ascii')

properties += b' ' + base64.b64encode(interfaces_prop)

if self.parent_device is not None:
parent_prop = b'parent=' + self.parent_device.ident.encode('ascii')
properties += b' ' + base64.b64encode(parent_prop)

data = b' '.join(
base64.b64encode(f'_{prop}={value!s}'.encode('ascii'))
for prop, value in ((key, self.data[key]) for key in self.data)
)
if data:
properties += b' ' + data

return properties

@classmethod
Expand All @@ -366,13 +385,12 @@ def deserialize(
expected_devclass: Optional[str] = None,
) -> 'DeviceInfo':
try:
ident, _, serialization = serialization.partition(b' ')
result = DeviceInfo._deserialize(
cls, serialization, expected_backend_domain, expected_devclass)
except Exception as exc:
print(exc, file=sys.stderr)
print(exc, file=sys.stderr) # TODO
ident = serialization.split(b' ')[0].decode(
'ascii', errors='ignore') # TODO
'ascii', errors='ignore')
result = UnknownDevice(
backend_domain=expected_backend_domain,
ident=ident,
Expand All @@ -389,12 +407,15 @@ def _deserialize(
) -> 'DeviceInfo':
properties_str = [
base64.b64decode(line).decode('ascii', errors='ignore')
for line in serialization.split(b' ')]
for line in serialization.split(b' ')[1:]]

properties = dict()
for line in properties_str:
key, _, param = line.partition("=")
properties[key] = param
if key.startswith("_"):
properties[key[1:]] = param
else:
properties[key] = param

if properties['backend_domain'] != expected_backend_domain.name:
raise ValueError("TODO") # TODO
Expand All @@ -408,6 +429,12 @@ def _deserialize(
for i in range(0, len(interfaces), 6)]
properties['interfaces'] = interfaces

if 'parent' in properties:
properties['parent'] = Device(
backend_domain=expected_backend_domain,
ident=properties['parent']
)

return cls(**properties)

@property
Expand Down

0 comments on commit a64ca39

Please sign in to comment.