From c6ba8a4b6f10bf2a363e420e8a14f90625842b4e Mon Sep 17 00:00:00 2001 From: Piotr Bartman-Szwarc Date: Thu, 15 Aug 2024 18:07:22 +0200 Subject: [PATCH] q-dev: device -> devices --- qubesadmin/device_protocol.py | 87 +++++++++++++++++----------- qubesadmin/tests/devices.py | 4 +- qubesadmin/tests/tools/qvm_device.py | 38 ++++++------ qubesadmin/tools/qvm_device.py | 60 ++++++++++++------- 4 files changed, 114 insertions(+), 75 deletions(-) diff --git a/qubesadmin/device_protocol.py b/qubesadmin/device_protocol.py index cc5fae73..98e82510 100644 --- a/qubesadmin/device_protocol.py +++ b/qubesadmin/device_protocol.py @@ -197,6 +197,7 @@ def parse_basic_device_properties( f"Unrecognized device identity '{properties['device_id']}' " f"expected '{expected_device.device_id}'" ) + expected._device_id = properties.get('device_id', expected_devid) properties['port'] = expected @@ -339,13 +340,21 @@ def devclass(self) -> str: class VirtualDevice: + """ + Class of a device connected to *port*. + + Attributes: + port (Port): A unique identifier for the port within the backend domain. + device_id (str): A unique identifier for the device. + """ def __init__( self, port: Optional[Port] = None, device_id: Optional[str] = None, ): + # TODO! one of them cannot be None self.port: Optional[Port] = port - self._device_id = device_id if device_id else '*' + self._device_id = device_id def clone(self, **kwargs): """ @@ -368,29 +377,33 @@ def port(self, value): @property def device_id(self): - return self._device_id - - @device_id.setter - def device_id(self, value): - self._device_id = value if value else '*' + if self._device_id is not None: + return self._device_id + return '*' @property def backend_domain(self): - if self.port != '*': + if self.port != '*' and self.port.backend_domain is not None: return self.port.backend_domain - return None + return '*' @property def port_id(self): - if self.port != '*': + if self.port != '*' and self.port.port_id is not None: return self.port.port_id - return None + return '*' @property def devclass(self): - if self.port != '*': + if self.port != '*' and self.port.devclass is not None: return self.port.devclass - return None + return '*' + + @property + def description(self): + if self.device_id == '*': + return 'any device' + return self.device_id def __hash__(self): return hash((self.port, self.device_id)) @@ -1026,8 +1039,8 @@ def __init__( mode: Union[str, AssignmentMode] = "manual", ): if isinstance(device, DeviceInfo): - device = VirtualDevice(device.port, device.device_id) - self._device_ident = device + device = VirtualDevice(device.port, device._device_id) + self.virtual_device = device self.__options = options or {} if isinstance(mode, AssignmentMode): self.mode = mode @@ -1039,12 +1052,8 @@ def clone(self, **kwargs): """ Clone object and substitute attributes with explicitly given. """ - kwargs["device"] = kwargs.get( - "device", VirtualDevice( - Port(self.backend_domain, self.port_id, self.devclass), - self.device_id - )) attr = { + "device": self.virtual_device, "options": self.options, "mode": self.mode, "frontend_domain": self.frontend_domain, @@ -1053,13 +1062,13 @@ def clone(self, **kwargs): return self.__class__(**attr) def __repr__(self): - return f"{self._device_ident!r}" + return f"{self.virtual_device!r}" def __str__(self): - return f"{self._device_ident}" + return f"{self.virtual_device}" def __hash__(self): - return hash(self._device_ident) + return hash(self.virtual_device) def __eq__(self, other): if isinstance(other, (VirtualDevice, DeviceAssignment)): @@ -1072,35 +1081,40 @@ def __eq__(self, other): def __lt__(self, other): if isinstance(other, DeviceAssignment): - return self._device_ident < other._device_ident + return self.virtual_device < other.virtual_device if isinstance(other, VirtualDevice): - return self._device_ident < other + return self.virtual_device < other raise TypeError( f"Comparing instances of {type(self)} and '{type(other)}' " "is not supported") @property def backend_domain(self): - return self._device_ident.port.backend_domain + return self.virtual_device.port.backend_domain @property def port_id(self): - return self._device_ident.port.port_id + return self.virtual_device.port.port_id @property def devclass(self): - return self._device_ident.port.devclass + return self.virtual_device.port.devclass @property def device_id(self): - return self._device_ident.device_id + return self.virtual_device.device_id @property - def device(self) -> DeviceInfo: + def devices(self) -> List[DeviceInfo]: """Get DeviceInfo object corresponding to this DeviceAssignment""" - dev = self.backend_domain.devices[self.devclass][self.port_id] - # TODO: device identity could not match - return dev + if self.port_id != '*': + # could return UnknownDevice + return [self.backend_domain.devices[self.devclass][self.port_id]] + result = [] + for dev in self.backend_domain.devices[self.devclass]: + if dev.device_id == self.device_id: + result.append(dev) + return result @property def port(self) -> Port: @@ -1130,7 +1144,10 @@ def attached(self) -> bool: Returns False if device is attached to different domain """ - return self.device.attachment == self.frontend_domain + for device in self.devices: + if device.attachment == self.frontend_domain: + return True + return False @property def required(self) -> bool: @@ -1166,7 +1183,7 @@ def serialize(self) -> bytes: """ Serialize an object to be transmitted via Qubes API. """ - properties = self._device_ident.serialize() + properties = self.virtual_device.serialize() properties += b' ' + DeviceSerializer.pack_property( 'mode', self.mode.value) if self.frontend_domain is not None: @@ -1210,6 +1227,8 @@ def _deserialize( expected_device, properties) # we do not need port, we need device del properties['port'] + expected_device._device_id = properties.get( + 'device_id', expected_device.device_id) properties.pop('device_id', None) properties['device'] = expected_device diff --git a/qubesadmin/tests/devices.py b/qubesadmin/tests/devices.py index 53b108f7..33b8a3f0 100644 --- a/qubesadmin/tests/devices.py +++ b/qubesadmin/tests/devices.py @@ -204,7 +204,7 @@ def test_040_dedicated(self): self.app.domains['test-vm']) self.assertEqual(dedicated[0].options, {}) self.assertEqual(dedicated[0].devclass, 'test') - self.assertEqual(dedicated[0].device, + self.assertEqual(dedicated[0].devices[0], self.app.domains['test-vm2'].devices['test']['dev1']) self.assertIsInstance(dedicated[1], DeviceAssignment) @@ -215,7 +215,7 @@ def test_040_dedicated(self): self.app.domains['test-vm']) self.assertEqual(dedicated[1].options, {}) self.assertEqual(dedicated[1].devclass, 'test') - self.assertEqual(dedicated[1].device, + self.assertEqual(dedicated[1].devices[0], self.app.domains['test-vm3'].devices['test']['dev2']) self.assertAllCalled() diff --git a/qubesadmin/tests/tools/qvm_device.py b/qubesadmin/tests/tools/qvm_device.py index 8aee0a6b..008bae4b 100644 --- a/qubesadmin/tests/tools/qvm_device.py +++ b/qubesadmin/tests/tools/qvm_device.py @@ -45,8 +45,9 @@ def setUp(self): b'test-vm3 class=AppVM state=Running\n') self.expected_device_call( 'test-vm1', 'Available', - b"0\0dev1 port_id='dev1' devclass='testclass' vendor='itl'" - b" product='test-device' backend_domain='test-vm1'" + b"0\0dev1 device_id='dead:beef:babe:u0123456' " + b"port_id='dev1' devclass='testclass' vendor='itl' " + b"product='test-device' backend_domain='test-vm1'" ) self.vm1 = self.app.domains['test-vm1'] self.vm2 = self.app.domains['test-vm2'] @@ -86,10 +87,13 @@ def test_001_list_assigned_required(self): # This shouldn't be listed self.expected_device_call( 'test-vm2', 'Available', - b"0\0dev2 port_id='dev2' devclass='testclass' backend_domain='test-vm2'\n") + b"0\0dev2 device_id='serial' port_id='dev2' " + b"devclass='testclass' backend_domain='test-vm2'\n") self.expected_device_call( 'test-vm3', 'Available', - b"0\0dev3 port_id='dev3' devclass='testclass' backend_domain='test-vm3' vendor='evil inc.' product='test-device-3'\n" + b"0\0dev3 port_id='dev3' device_id='0000:0000::?******' " + b"devclass='testclass' backend_domain='test-vm3' " + b"vendor='evil inc.' product='test-device-3'\n" ) self.expected_device_call('test-vm1', 'Attached') self.expected_device_call('test-vm2', 'Attached') @@ -100,8 +104,8 @@ def test_001_list_assigned_required(self): b"0\0test-vm1+dev1 port_id='dev1' devclass='testclass' " b"backend_domain='test-vm1' " b"mode='required' _option='other option' _extra_opt='yes'\n" - b"test-vm3+dev3 port_id='dev3' devclass='testclass' " - b"backend_domain='test-vm3' mode='required'\n" + b"test-vm3+dev3 device_id='0000:0000::?******' port_id='dev3' " + b"devclass='testclass' backend_domain='test-vm3' mode='required'\n" ) self.expected_device_call( 'test-vm3', 'Assigned', @@ -114,7 +118,7 @@ def test_001_list_assigned_required(self): ['testclass', 'list', 'test-vm3'], app=self.app) self.assertEqual( buf.getvalue(), - 'test-vm1:dev1 ?******: itl test-device ' + 'test-vm1:dev1 any device ' 'test-vm2 (option=other option, extra_opt=yes), ' 'test-vm3 (option=test option)\n' 'test-vm3:dev3 ?******: evil inc. test-device-3 test-vm2\n' @@ -167,8 +171,8 @@ def test_010_attach(self): """ Test attach action """ self.app.expected_calls[( 'test-vm2', 'admin.vm.device.testclass.Attach', - 'test-vm1+dev1:*', - b"device_id='*' port_id='dev1' " + 'test-vm1+dev1:dead:beef:babe:u0123456', + b"device_id='dead:beef:babe:u0123456' port_id='dev1' " b"devclass='testclass' backend_domain='test-vm1' mode='manual' " b"frontend_domain='test-vm2'")] = b'0\0' qubesadmin.tools.qvm_device.main( @@ -179,8 +183,8 @@ def test_011_attach_options(self): """ Test `read-only` attach option """ self.app.expected_calls[( 'test-vm2', 'admin.vm.device.testclass.Attach', - 'test-vm1+dev1:*', - b"device_id='*' port_id='dev1' " + 'test-vm1+dev1:dead:beef:babe:u0123456', + b"device_id='dead:beef:babe:u0123456' port_id='dev1' " b"devclass='testclass' backend_domain='test-vm1' mode='manual' " b"frontend_domain='test-vm2' _read-only='yes'")] = b'0\0' qubesadmin.tools.qvm_device.main( @@ -259,8 +263,8 @@ def test_030_assign(self): self.app.domains['test-vm2'].is_running = lambda: True self.app.expected_calls[( 'test-vm2', 'admin.vm.device.testclass.Assign', - 'test-vm1+dev1:0000:0000::?******', - b"device_id='0000:0000::?******' port_id='dev1' " + 'test-vm1+dev1:dead:beef:babe:u0123456', + b"device_id='dead:beef:babe:u0123456' port_id='dev1' " b"devclass='testclass' backend_domain='test-vm1' " b"mode='auto-attach' frontend_domain='test-vm2'" )] = b'0\0' @@ -273,8 +277,8 @@ def test_031_assign_required(self): self.app.domains['test-vm2'].is_running = lambda: True self.app.expected_calls[( 'test-vm2', 'admin.vm.device.testclass.Assign', - 'test-vm1+dev1:0000:0000::?******', - b"device_id='0000:0000::?******' port_id='dev1' " + 'test-vm1+dev1:dead:beef:babe:u0123456', + b"device_id='dead:beef:babe:u0123456' port_id='dev1' " b"devclass='testclass' backend_domain='test-vm1' mode='required' " b"frontend_domain='test-vm2'" )] = b'0\0' @@ -287,8 +291,8 @@ def test_032_assign_options(self): self.app.domains['test-vm2'].is_running = lambda: True self.app.expected_calls[( 'test-vm2', 'admin.vm.device.testclass.Assign', - 'test-vm1+dev1:0000:0000::?******', - b"device_id='0000:0000::?******' port_id='dev1' " + 'test-vm1+dev1:dead:beef:babe:u0123456', + b"device_id='dead:beef:babe:u0123456' port_id='dev1' " b"devclass='testclass' backend_domain='test-vm1' " b"mode='auto-attach' frontend_domain='test-vm2' _read-only='yes'" )] = b'0\0' diff --git a/qubesadmin/tools/qvm_device.py b/qubesadmin/tools/qvm_device.py index e10868fe..ec01aed1 100644 --- a/qubesadmin/tools/qvm_device.py +++ b/qubesadmin/tools/qvm_device.py @@ -111,8 +111,13 @@ def _load_devices(app, domains, devclass): try: for vm in domains: try: - for ass in vm.devices[devclass].get_dedicated_devices(): - devices.add(ass.device) + for ass in vm.devices[devclass].get_attached_devices(): + if len(ass.devices) > 1: + print("There is to many devices in assignment", + file=sys.stderr) + devices.add(ass.devices[0]) + for ass in vm.devices[devclass].get_assigned_devices(): + devices.add(ass.virtual_device) for dev in vm.devices[devclass].get_exposed_devices(): devices.add(dev) except qubesadmin.exc.QubesVMNotFoundError: @@ -134,19 +139,25 @@ def _load_frontends_info(vm, dev, devclass): return try: - for assignment in vm.devices[devclass].get_dedicated_devices(): - if dev != assignment.device: - continue - if assignment.options: - yield '{!s} ({})'.format( - vm, ', '.join('{}={}'.format(key, value) - for key, value in assignment.options.items())) - else: - yield str(vm) + for assignment in vm.devices[devclass].get_attached_devices(): + if dev in assignment.devices: + yield _frontend_desc(vm, assignment) + for assignment in vm.devices[devclass].get_assigned_devices(): + if dev == assignment.virtual_device: + yield _frontend_desc(vm, assignment) except qubesadmin.exc.QubesVMNotFoundError: pass +def _frontend_desc(vm, assignment): + if assignment.options: + return '{!s} ({})'.format( + vm, ', '.join('{}={}'.format(key, value) + for key, value in assignment.options.items())) + else: + return str(vm) + + def attach_device(args): """ Called by the parser to execute the :program:`qvm-devices attach` subcommand. @@ -194,6 +205,8 @@ def detach_device(args): vm = args.domains[0] if args.device: device = args.device + # ignore device id, detach any device + device.device_id = None assignment = DeviceAssignment(device) vm.devices[args.devclass].detach(assignment) else: @@ -207,7 +220,7 @@ def assign_device(args): """ vm = args.domains[0] device = args.device - if not args.port: + if args.port: device.device_id = None options = dict(opt.split('=', 1) for opt in args.option or []) if args.ro: @@ -237,6 +250,8 @@ def unassign_device(args): vm = args.domains[0] if args.device: device = args.device + # ignore device id, detach any device + device.device_id = None assignment = DeviceAssignment( device, frontend_domain=vm) _unassign_and_show_message(assignment, vm, args) @@ -266,11 +281,12 @@ def info_device(args): print("description:", device.description) print("data:", device.data) else: - for device_assignment in ( - vm.devices[args.devclass].get_dedicated_devices()): - print("device_assignment:", device_assignment) - print("description:", device_assignment.device.description) - print("data:", device_assignment.device.data) + for assignment in (vm.devices[args.devclass].get_dedicated_devices()): + print("device_assignment:", assignment) + if len(assignment.devices) == 1: + device = assignment.devices[0] + print("description:", device.description) + print("data:", device.data) def init_list_parser(sub_parsers): @@ -310,7 +326,7 @@ def parse_qubes_app(self, parser, namespace): return try: - vmname, device_id = backend_device_id.split(':', 1) + vmname, port_id = backend_device_id.split(':', 1) vm = None try: vm = app.domains[vmname] @@ -318,15 +334,15 @@ def parse_qubes_app(self, parser, namespace): parser.error_runtime("no backend vm {!r}".format(vmname)) try: - dev = vm.devices[devclass][device_id] + dev = vm.devices[devclass][port_id] if not self.allow_unknown and \ isinstance(dev, UnknownDevice): - raise KeyError(device_id) + raise KeyError(port_id) except KeyError: parser.error_runtime( f"backend vm {vmname!r} doesn't expose " - f"{devclass} device {device_id!r}") - dev = Port(vm, device_id, devclass) + f"{devclass} device {port_id!r}") + dev = UnknownDevice(Port(vm, port_id, devclass)) setattr(namespace, self.dest, dev) except ValueError: parser.error(