From 247dca6141ef61944bbbfd7cf6aad9da12bd293f Mon Sep 17 00:00:00 2001 From: Piotr Bartman-Szwarc Date: Wed, 14 Aug 2024 22:15:27 +0200 Subject: [PATCH 01/19] q-dev: implement device_id --- qubes_config/tests/test_usb_devices.py | 24 +++++++++--------------- qui/decorators.py | 2 +- qui/devices/backend.py | 12 +++++++++--- 3 files changed, 19 insertions(+), 19 deletions(-) diff --git a/qubes_config/tests/test_usb_devices.py b/qubes_config/tests/test_usb_devices.py index c766200c..b1905fe4 100644 --- a/qubes_config/tests/test_usb_devices.py +++ b/qubes_config/tests/test_usb_devices.py @@ -845,15 +845,13 @@ def test_u2f_handler_add_without_service(test_qapp, def test_devices_handler_unsaved(test_qapp, test_policy_manager, real_builder): test_qapp.expected_calls[('sys-usb', "admin.vm.device.pci.Attached", None, None)] = \ - b"0\x00dom0+00_0d.0 ident='00_0d.0' devclass='pci' " \ + b"0\x00dom0+00_0d.0 device_id='*' port_id='00_0d.0' devclass='pci' " \ b"backend_domain='dom0' required='yes' attach_automatically='yes' " \ b"_no-strict-reset='yes'\n" test_qapp.expected_calls[('dom0', "admin.vm.device.pci.Available", None, None)] = \ - b"0\x0000_0d.0 ident='00_0d.0' devclass='pci' backend_domain='dom0' " \ - b"serial='unknown' manufacturer='unknown' " \ - b"self_identity='0000:0000::p0c0300' vendor='unknown' " \ - b"product='unknown' name='unknown' interfaces='p0c0300' " \ + b"0\x0000_0d.0 device_id='0000:0000::p0c0300' port_id='00_0d.0' " \ + b"devclass='pci' backend_domain='dom0' interfaces='p0c0300' " \ b"_function='0' _bus='00' _libvirt_name='pci_0000_00_0d_0' " \ b"_device='0d'\n" @@ -878,26 +876,22 @@ def test_devices_handler_detect_usbvms(test_qapp, test_policy_manager, real_builder): test_qapp.expected_calls[('sys-usb', "admin.vm.device.pci.Attached", None, None)] = \ - b"0\x00dom0+00_0d.0 ident='00_0d.0' devclass='pci' " \ + b"0\x00dom0+00_0d.0 device_id='*' port_id='00_0d.0' devclass='pci' " \ b"backend_domain='dom0' required='yes' attach_automatically='yes' " \ b"_no-strict-reset='yes'\n" test_qapp.expected_calls[('test-standalone', "admin.vm.device.pci.Attached", None, None)] = \ - b"0\x00dom0+00_0f.0 ident='00_0f.0' devclass='pci' " \ + b"0\x00dom0+00_0f.0 device_id='*' port_id='00_0f.0' devclass='pci' " \ b"backend_domain='dom0' required='yes' attach_automatically='yes' " \ b"_no-strict-reset='yes'\n" test_qapp.expected_calls[('dom0', "admin.vm.device.pci.Available", None, None)] = \ - b"0\x0000_0f.0 ident='00_0f.0' devclass='pci' backend_domain='dom0' " \ - b"serial='unknown' manufacturer='unknown' " \ - b"self_identity='0000:0000::p0c0300' vendor='unknown' " \ - b"product='unknown' name='unknown' interfaces='p0c0300' " \ + b"0\x0000_0f.0 device_id='0000:0000::p0c0300' port_id='00_0f.0' " \ + b"devclass='pci' backend_domain='dom0' interfaces='p0c0300' " \ b"_function='0' _bus='00' _libvirt_name='pci_0000_00_0f_0' " \ b"_device='0f'\n" \ - b"00_0d.0 ident='00_0d.0' devclass='pci' backend_domain='dom0' " \ - b"serial='unknown' manufacturer='unknown' " \ - b"self_identity='0000:0000::p0c0300' vendor='unknown' " \ - b"product='unknown' name='unknown' interfaces='p0c0300' " \ + b"00_0d.0 device_id='0000:0000::p0c0300' port_id='00_0d.0' " \ + b"devclass='pci' backend_domain='dom0' interfaces='p0c0300' " \ b"_function='0' _bus='00' _libvirt_name='pci_0000_00_0d_0' " \ b"_device='0d'\n" diff --git a/qui/decorators.py b/qui/decorators.py index 41b46491..8a76fb6a 100644 --- a/qui/decorators.py +++ b/qui/decorators.py @@ -247,7 +247,7 @@ def device_hbox(device) -> Gtk.Box: dev_icon = create_icon(icon) name_label = Gtk.Label(xalign=0) - name = f"{device.backend_domain}:{device.ident} - {device.description}" + name = f"{device.backend_domain}:{device.port_id} - {device.description}" if device.attachments: dev_list = ", ".join(list(device.attachments)) name_label.set_markup(f'{name} ({dev_list})') diff --git a/qui/devices/backend.py b/qui/devices/backend.py index 449f7d21..58b3b8d7 100644 --- a/qui/devices/backend.py +++ b/qui/devices/backend.py @@ -107,7 +107,7 @@ def __init__(self, dev: qubesadmin.devices.DeviceInfo, if dev.devclass == 'block' and 'size' in dev.data: self._dev_name += " (" + size_to_human(int(dev.data['size'])) + ")" - self._ident: str = getattr(dev, 'ident', 'unknown') + self._ident: str = getattr(dev, 'port_id', 'unknown') self._description: str = getattr(dev, 'description', 'unknown') self._devclass: str = getattr(dev, 'devclass', 'unknown') self._data: Dict = getattr(dev, 'data', {}) @@ -232,7 +232,10 @@ def attach_to_vm(self, vm: VM): """ try: assignment = qubesadmin.device_protocol.DeviceAssignment( - self.backend_domain, self.id_string) + qubesadmin.device_protocol.Device( + qubesadmin.device_protocol.Port( + self.backend_domain, self.id_string, self.device_class) + )) vm.vm_object.devices[self.device_class].attach(assignment) self.gtk_app.emit_notification( @@ -263,7 +266,10 @@ def detach_from_vm(self, vm: VM): notification_id=self.notification_id) try: assignment = qubesadmin.device_protocol.DeviceAssignment( - self.backend_domain, self._ident) + qubesadmin.device_protocol.Device( + qubesadmin.device_protocol.Port( + self.backend_domain, self._ident, self.device_class) + )) vm.vm_object.devices[self.device_class].detach(assignment) except qubesadmin.exc.QubesException as ex: self.gtk_app.emit_notification( From 8ead9c259bda29a2b72122c228e063a5fd59cb4d Mon Sep 17 00:00:00 2001 From: Piotr Bartman-Szwarc Date: Thu, 15 Aug 2024 10:21:21 +0200 Subject: [PATCH 02/19] q-dev: fix detaching --- qubes_config/tests/test_usb_devices.py | 6 +++--- qui/devices/backend.py | 4 +++- qui/devices/device_widget.py | 24 +++++++++++++----------- 3 files changed, 19 insertions(+), 15 deletions(-) diff --git a/qubes_config/tests/test_usb_devices.py b/qubes_config/tests/test_usb_devices.py index b1905fe4..40d73eb6 100644 --- a/qubes_config/tests/test_usb_devices.py +++ b/qubes_config/tests/test_usb_devices.py @@ -846,7 +846,7 @@ def test_devices_handler_unsaved(test_qapp, test_policy_manager, real_builder): test_qapp.expected_calls[('sys-usb', "admin.vm.device.pci.Attached", None, None)] = \ b"0\x00dom0+00_0d.0 device_id='*' port_id='00_0d.0' devclass='pci' " \ - b"backend_domain='dom0' required='yes' attach_automatically='yes' " \ + b"backend_domain='dom0' mode='required' " \ b"_no-strict-reset='yes'\n" test_qapp.expected_calls[('dom0', "admin.vm.device.pci.Available", None, None)] = \ @@ -877,12 +877,12 @@ def test_devices_handler_detect_usbvms(test_qapp, test_qapp.expected_calls[('sys-usb', "admin.vm.device.pci.Attached", None, None)] = \ b"0\x00dom0+00_0d.0 device_id='*' port_id='00_0d.0' devclass='pci' " \ - b"backend_domain='dom0' required='yes' attach_automatically='yes' " \ + b"backend_domain='dom0' mode='required' " \ b"_no-strict-reset='yes'\n" test_qapp.expected_calls[('test-standalone', "admin.vm.device.pci.Attached", None, None)] = \ b"0\x00dom0+00_0f.0 device_id='*' port_id='00_0f.0' devclass='pci' " \ - b"backend_domain='dom0' required='yes' attach_automatically='yes' " \ + b"backend_domain='dom0' mode='required' " \ b"_no-strict-reset='yes'\n" test_qapp.expected_calls[('dom0', "admin.vm.device.pci.Available", None, None)] = \ diff --git a/qui/devices/backend.py b/qui/devices/backend.py index 58b3b8d7..ef0452d4 100644 --- a/qui/devices/backend.py +++ b/qui/devices/backend.py @@ -111,6 +111,7 @@ def __init__(self, dev: qubesadmin.devices.DeviceInfo, self._description: str = getattr(dev, 'description', 'unknown') self._devclass: str = getattr(dev, 'devclass', 'unknown') self._data: Dict = getattr(dev, 'data', {}) + self._device_id = getattr(dev, 'device_id', '*') self.attachments: Set[VM] = set() backend_domain = getattr(dev, 'backend_domain', None) if backend_domain: @@ -234,7 +235,8 @@ def attach_to_vm(self, vm: VM): assignment = qubesadmin.device_protocol.DeviceAssignment( qubesadmin.device_protocol.Device( qubesadmin.device_protocol.Port( - self.backend_domain, self.id_string, self.device_class) + self.backend_domain, self.id_string, self.device_class), + device_id=self._device_id, )) vm.vm_object.devices[self.device_class].attach(assignment) diff --git a/qui/devices/device_widget.py b/qui/devices/device_widget.py index 44d0fd9a..fe034b4d 100644 --- a/qui/devices/device_widget.py +++ b/qui/devices/device_widget.py @@ -124,7 +124,8 @@ def device_list_update(self, vm, _event, **_kwargs): try: for devclass in DEV_TYPES: for device in vm.devices[devclass]: - changed_devices[str(device)] = backend.Device(device, self) + changed_devices[str(device.port)] = backend.Device( + device, self) except qubesadmin.exc.QubesException: changed_devices = {} # VM was removed @@ -172,7 +173,8 @@ def initialize_dev_data(self): for devclass in DEV_TYPES: try: for device in domain.devices[devclass]: - self.devices[str(device)] = backend.Device(device, self) + self.devices[str(device.port)] = backend.Device( + device, self) except qubesadmin.exc.QubesException: # we have no permission to access VM's devices continue @@ -183,7 +185,7 @@ def initialize_dev_data(self): try: for device in domain.devices[devclass ].get_attached_devices(): - dev = str(device) + dev = str(device.port) if dev in self.devices: # occassionally ghost UnknownDevices appear when a # device was removed but not detached from a VM @@ -202,14 +204,14 @@ def device_attached(self, vm, _event, device, **_kwargs): # we don't have access to VM state return - if str(device) not in self.devices: - self.devices[str(device)] = backend.Device(device, self) + if str(device.port) not in self.devices: + self.devices[str(device.port)] = backend.Device(device, self) vm_wrapped = backend.VM(vm) - self.devices[str(device)].attachments.add(vm_wrapped) + self.devices[str(device.port)].attachments.add(vm_wrapped) - def device_detached(self, vm, _event, device, **_kwargs): + def device_detached(self, vm, _event, port, **_kwargs): try: if not vm.is_running(): return @@ -217,11 +219,11 @@ def device_detached(self, vm, _event, device, **_kwargs): # we don't have access to VM state return - device = str(device) + port = str(port) vm_wrapped = backend.VM(vm) - if device in self.devices: - self.devices[device].attachments.discard(vm_wrapped) + if port in self.devices: + self.devices[port].attachments.discard(vm_wrapped) def vm_start(self, vm, _event, **_kwargs): wrapped_vm = backend.VM(vm) @@ -231,7 +233,7 @@ def vm_start(self, vm, _event, **_kwargs): for devclass in DEV_TYPES: try: for device in vm.devices[devclass].get_attached_devices(): - dev = str(device) + dev = str(device.port) if dev in self.devices: self.devices[dev].attachments.add(wrapped_vm) except qubesadmin.exc.QubesDaemonAccessError: From 070406c79500745bcf590bc9b2ec482f8941e428 Mon Sep 17 00:00:00 2001 From: Piotr Bartman-Szwarc Date: Mon, 19 Aug 2024 07:59:53 +0200 Subject: [PATCH 03/19] q-dev: virtual device --- qui/devices/backend.py | 18 +++++++----------- 1 file changed, 7 insertions(+), 11 deletions(-) diff --git a/qui/devices/backend.py b/qui/devices/backend.py index ef0452d4..c06f7082 100644 --- a/qui/devices/backend.py +++ b/qui/devices/backend.py @@ -24,6 +24,8 @@ import qubesadmin.devices import qubesadmin.vm from qubesadmin.utils import size_to_human +from qubesadmin.device_protocol import (Port, VirtualDevice, DeviceInfo, + DeviceAssignment) import gi gi.require_version('Gtk', '3.0') # isort:skip @@ -232,12 +234,9 @@ def attach_to_vm(self, vm: VM): Perform attachment to provided VM. """ try: - assignment = qubesadmin.device_protocol.DeviceAssignment( - qubesadmin.device_protocol.Device( - qubesadmin.device_protocol.Port( - self.backend_domain, self.id_string, self.device_class), - device_id=self._device_id, - )) + assignment = DeviceAssignment(VirtualDevice(Port( + self.backend_domain, self.id_string, self.device_class), + device_id=self._device_id)) vm.vm_object.devices[self.device_class].attach(assignment) self.gtk_app.emit_notification( @@ -267,11 +266,8 @@ def detach_from_vm(self, vm: VM): Gio.NotificationPriority.NORMAL, notification_id=self.notification_id) try: - assignment = qubesadmin.device_protocol.DeviceAssignment( - qubesadmin.device_protocol.Device( - qubesadmin.device_protocol.Port( - self.backend_domain, self._ident, self.device_class) - )) + assignment = DeviceAssignment(VirtualDevice(Port( + self.backend_domain, self._ident, self.device_class))) vm.vm_object.devices[self.device_class].detach(assignment) except qubesadmin.exc.QubesException as ex: self.gtk_app.emit_notification( From 40b3736b7c64bea8d8895d3987c05b35640c3d13 Mon Sep 17 00:00:00 2001 From: Piotr Bartman-Szwarc Date: Tue, 20 Aug 2024 08:27:34 +0200 Subject: [PATCH 04/19] q-dev: cleanup --- qui/devices/backend.py | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/qui/devices/backend.py b/qui/devices/backend.py index c06f7082..0b622992 100644 --- a/qui/devices/backend.py +++ b/qui/devices/backend.py @@ -24,8 +24,7 @@ import qubesadmin.devices import qubesadmin.vm from qubesadmin.utils import size_to_human -from qubesadmin.device_protocol import (Port, VirtualDevice, DeviceInfo, - DeviceAssignment) +from qubesadmin.device_protocol import Port, VirtualDevice, DeviceAssignment import gi gi.require_version('Gtk', '3.0') # isort:skip From 1d34246909720a9d3406ae53949974f9a8fa6f22 Mon Sep 17 00:00:00 2001 From: Piotr Bartman-Szwarc Date: Tue, 20 Aug 2024 08:37:07 +0200 Subject: [PATCH 05/19] q-dev: move gui parts from core-admin --- Makefile | 3 + autostart/qubes-device-agent.desktop | 7 + qui/devices/AttachConfirmationWindow.glade | 359 ++++++++++ qui/devices/qubes-device-agent-autostart | 9 + qui/tools/__init__.py | 20 + qui/tools/attach_confirm.py | 100 +++ qui/tools/qubes_device_agent.py | 714 +++++++++++++++++++ rpm_spec/qubes-desktop-linux-manager.spec.in | 12 + setup.py | 33 +- 9 files changed, 1256 insertions(+), 1 deletion(-) create mode 100644 autostart/qubes-device-agent.desktop create mode 100644 qui/devices/AttachConfirmationWindow.glade create mode 100755 qui/devices/qubes-device-agent-autostart create mode 100644 qui/tools/__init__.py create mode 100755 qui/tools/attach_confirm.py create mode 100644 qui/tools/qubes_device_agent.py diff --git a/Makefile b/Makefile index 9d552f33..78adc9c2 100644 --- a/Makefile +++ b/Makefile @@ -34,6 +34,7 @@ install-icons: install-autostart: mkdir -p $(DESTDIR)/etc/xdg/autostart + cp autostart/qubes-device-agent.desktop $(DESTDIR)/etc/xdg/autostart cp autostart/qui-domains.desktop $(DESTDIR)/etc/xdg/autostart cp autostart/qui-devices.desktop $(DESTDIR)/etc/xdg/autostart cp autostart/qui-clipboard.desktop $(DESTDIR)/etc/xdg/autostart @@ -48,6 +49,8 @@ install-autostart: cp desktop/qubes-global-config.desktop $(DESTDIR)/usr/share/applications/ cp desktop/qubes-new-qube.desktop $(DESTDIR)/usr/share/applications/ cp desktop/qubes-policy-editor-gui.desktop $(DESTDIR)/usr/share/applications/ + install -d $(DESTDIR)/usr/lib/qubes -m 0755 + install -m 0755 qui/devices/qubes-device-agent-autostart $(DESTDIR)/usr/lib/qubes/qubes-device-agent-autostart install-lang: mkdir -p $(DESTDIR)/usr/share/gtksourceview-4/language-specs/ diff --git a/autostart/qubes-device-agent.desktop b/autostart/qubes-device-agent.desktop new file mode 100644 index 00000000..1cb9c9ec --- /dev/null +++ b/autostart/qubes-device-agent.desktop @@ -0,0 +1,7 @@ +[Desktop Entry] +Name=Qubes Device Agent +Comment=Agent for handling device attach confirmation prompts +Icon=qubes +Exec=/usr/lib/qubes/qubes-device-agent-autostart +Terminal=false +Type=Application diff --git a/qui/devices/AttachConfirmationWindow.glade b/qui/devices/AttachConfirmationWindow.glade new file mode 100644 index 00000000..d8c0df42 --- /dev/null +++ b/qui/devices/AttachConfirmationWindow.glade @@ -0,0 +1,359 @@ + + + + + + 400 + False + Device attachment + center + dialog-question + dialog + True + center + + + True + False + vertical + + + True + False + True + error + True + + + False + 6 + end + + + + + + False + False + 0 + + + + + False + 16 + + + True + False + gtk-dialog-error + + + False + True + 0 + + + + + True + False + ErrorMessage + + + False + True + 1 + + + + + False + False + 0 + + + + + False + True + 0 + + + + + 100 + 80 + True + False + 12 + 12 + 12 + 12 + True + vertical + 6 + + + True + False + 6 + end + + + gtk-cancel + True + True + True + True + + + True + True + 0 + + + + + gtk-ok + True + False + True + True + True + True + + + True + True + 1 + + + + + False + True + end + 1 + + + + + True + False + vertical + 6 + + + True + False + 6 + 12 + + + True + False + gtk-dialog-question + 6 + + + False + True + 0 + + + + + True + False + start + Do you want to attach the following device? +<small>Select the target domain and confirm with 'OK'</small> + True + + + True + True + 1 + + + + + False + True + 0 + + + + + True + False + 12 + 6 + 12 + 6 + + + True + False + 1 + Target: + + + 0 + 2 + + + + + True + False + True + True + + + True + 5 + False + Start typing or use the arrow + GTK_INPUT_HINT_WORD_COMPLETION | GTK_INPUT_HINT_NONE + + + + + 1 + 2 + + + + + True + False + 1 + Source: + + + 0 + 0 + + + + + True + False + False + source + False + + + 1 + 0 + + + + + True + False + 1 + Device: + + + 0 + 1 + + + + + True + False + 0 + qubes.<b>MyOperation</b> + True + + + 1 + 1 + + + + + False + True + 1 + + + + + True + True + + + True + False + 6 + vertical + 6 + + + Display templates in the target list + True + True + False + 0 + True + + + False + True + 0 + + + + + Choose a custom destination in the target + True + True + False + 0 + True + + + False + True + 1 + + + + + + + False + Advanced options + True + + + + + False + True + 2 + + + + + True + True + 2 + + + + + False + True + 1 + + + + + + diff --git a/qui/devices/qubes-device-agent-autostart b/qui/devices/qubes-device-agent-autostart new file mode 100755 index 00000000..8e001ce3 --- /dev/null +++ b/qui/devices/qubes-device-agent-autostart @@ -0,0 +1,9 @@ +#!/bin/sh + +if ! test -f /var/run/qubes-service/guivm && \ + ! test -f /etc/qubes-release; then + echo "Not in GuiVM or dom0. Exiting." + exit 0 +fi + +exec qubes-device-agent "$@" diff --git a/qui/tools/__init__.py b/qui/tools/__init__.py new file mode 100644 index 00000000..1d211911 --- /dev/null +++ b/qui/tools/__init__.py @@ -0,0 +1,20 @@ +# coding=utf-8 +# +# The Qubes OS Project, https://www.qubes-os.org +# +# Copyright (C) 2024 Piotr Bartman-Szwarc +# +# This program is free software; you can redistribute it and/or +# modify it under the terms of the GNU General Public License +# as published by the Free Software Foundation; either version 2 +# of the License, or (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with this program; if not, write to the Free Software +# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, +# USA. diff --git a/qui/tools/attach_confirm.py b/qui/tools/attach_confirm.py new file mode 100755 index 00000000..65017758 --- /dev/null +++ b/qui/tools/attach_confirm.py @@ -0,0 +1,100 @@ +#!/usr/bin/python +# +# The Qubes OS Project, https://www.qubes-os.org +# +# Copyright (C) 2024 Piotr Bartman-Szwarc +# +# This program is free software; you can redistribute it and/or +# modify it under the terms of the GNU General Public License +# as published by the Free Software Foundation; either version 2 +# of the License, or (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with this program; if not, write to the Free Software +# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, +# USA. +import os +import sys +import json +import asyncio + +from qubes import Qubes + + +SOCKET_PATH = "/var/run/qubes" + + +def call_socket_service( + remote_domain, service, source_domain, params, socket_path=SOCKET_PATH +): + """ + Call a socket service, either over qrexec or locally. + + The request is JSON-encoded, response is plain ASCII text. + """ + + if remote_domain == source_domain: + return call_socket_service_local( + service, source_domain, params, socket_path + ) + raise NotImplementedError() + # return call_socket_service_remote(remote_domain, service, params) + + +async def call_socket_service_local( + service, source_domain, params, socket_path=SOCKET_PATH +): + if source_domain == "dom0": + header = f"{service} dom0 name dom0\0".encode("ascii") + else: + header = f"{service} {source_domain}\0".encode("ascii") + + path = os.path.join(socket_path, service) + reader, writer = await asyncio.open_unix_connection(path) + writer.write(header) + writer.write(json.dumps(params).encode("ascii")) + writer.write_eof() + await writer.drain() + response = await reader.read() + return response.decode("ascii") + + +def main(): + socket = "device-agent.GUI" + + guivm = sys.argv[1] + + number_of_targets = len(sys.argv) - 5 + doms = Qubes().domains + + params = { + "source": sys.argv[2], + "device_name": sys.argv[4], + "argument": sys.argv[3], + "targets": sys.argv[5:], + "default_target": sys.argv[5] if number_of_targets == 1 else "", + "icons": { + doms[d].name + if doms[d].klass != "DispVM" else f'@dispvm:{doms[d].name}': + doms[d].icon for d in doms.keys() + }, + } + + ask_response = asyncio.run(call_socket_service( + guivm, socket, "dom0", params + )) + + if ask_response.startswith("allow:"): + print(ask_response[len("allow:"):], end="") + exit(0) + else: + exit(1) + + +if __name__ == "__main__": + main() diff --git a/qui/tools/qubes_device_agent.py b/qui/tools/qubes_device_agent.py new file mode 100644 index 00000000..ede88061 --- /dev/null +++ b/qui/tools/qubes_device_agent.py @@ -0,0 +1,714 @@ +#!/usr/bin/python +# +# The Qubes OS Project, https://www.qubes-os.org +# +# Copyright (C) 2024 Piotr Bartman-Szwarc +# +# This program is free software; you can redistribute it and/or +# modify it under the terms of the GNU General Public License +# as published by the Free Software Foundation; either version 2 +# of the License, or (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with this program; if not, write to the Free Software +# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, +# USA. +""" Agent running in user session, responsible for asking the user about device +attachment.""" + +import itertools +import os +import argparse +import asyncio +import json + +import importlib.resources + +# pylint: disable=import-error,wrong-import-position +import gi + +gi.require_version("Gtk", "3.0") +from gi.repository import Gtk, Gdk, GdkPixbuf, GLib, Gio + +# pylint: enable=import-error + +# pylint: disable=wrong-import-order +import gbulb + +# from qrexec.utils import sanitize_domain_name +# from ..server import SocketService + +# pylint: enable=wrong-import-position + +DEVICE_AGENT_SOCKET_PATH = "/var/run/qubes/device-agent.GUI" + + +class VMListModeler: + def __init__(self, options, domains_info=None): + self._entries = {} + self._domains_info = domains_info + self._icons = {} + self._icon_size = 16 + self._theme = Gtk.IconTheme.get_default() + self._create_entries(options) + + def _get_icon(self, name): + if name not in self._icons: + try: + icon = self._theme.load_icon(name, self._icon_size, 0) + except GLib.Error: # pylint: disable=catching-non-exception + icon = self._theme.load_icon("edit-find", self._icon_size, 0) + + self._icons[name] = icon + + return self._icons[name] + + def _create_entries(self, options): + for name, vm in self._domains_info.items(): + if name.startswith("@dispvm:"): + vm_name = name[len("@dispvm:"):] + prefix = "Disposable VM: " + else: + vm_name = name + prefix = "" + # sanitize_domain_name(vm_name, assert_sanitized=True) TODO + + icon = self._get_icon(vm.get("icon", None)) + + display_name = prefix + vm_name + options.get(name, "") + self._entries[display_name] = { + "api_name": vm_name, + "icon": icon, + "vm": vm, + } + + def _get_valid_qube_name(self, combo, entry_box, whitelist): + name = None + + if combo and combo.get_active_id(): + selected = combo.get_active_id() + + if ( + selected in self._entries + and self._entries[selected]["api_name"] in whitelist + ): + name = selected + + if not name and entry_box: + typed = entry_box.get_text() + + if ( + typed in self._entries + and self._entries[typed]["api_name"] in whitelist + ): + name = typed + + return name + + def _combo_change(self, selection_trigger, combo, entry_box, whitelist): + data = None + name = self._get_valid_qube_name(combo, entry_box, whitelist) + + if name: + entry = self._entries[name] + + data = entry["api_name"] + + if entry_box: + entry_box.set_icon_from_pixbuf( + Gtk.EntryIconPosition.PRIMARY, entry["icon"] + ) + else: + if entry_box: + entry_box.set_icon_from_stock( + Gtk.EntryIconPosition.PRIMARY, "gtk-find" + ) + + if selection_trigger: + selection_trigger(data) + + def _entry_activate(self, activation_trigger, combo, entry_box, whitelist): + name = self._get_valid_qube_name(combo, entry_box, whitelist) + + if name: + activation_trigger(entry_box) + + def apply_model( + self, + destination_object, + vm_list, + selection_trigger=None, + activation_trigger=None, + ): + if isinstance(destination_object, Gtk.ComboBox): + list_store = Gtk.ListStore(int, str, GdkPixbuf.Pixbuf, str) + + for entry_no, display_name in zip( + itertools.count(), sorted(self._entries) + ): + entry = self._entries[display_name] + if entry["api_name"] in vm_list: + list_store.append( + [ + entry_no, + display_name, + entry["icon"], + entry["api_name"], + ] + ) + + destination_object.set_model(list_store) + destination_object.set_id_column(1) + + icon_column = Gtk.CellRendererPixbuf() + destination_object.pack_start(icon_column, False) + destination_object.add_attribute(icon_column, "pixbuf", 2) + destination_object.set_entry_text_column(1) + + if destination_object.get_has_entry(): + entry_box = destination_object.get_child() + + area = Gtk.CellAreaBox() + area.pack_start(icon_column, False, False, False) + area.add_attribute(icon_column, "pixbuf", 2) + + completion = Gtk.EntryCompletion.new_with_area(area) + completion.set_inline_selection(True) + completion.set_inline_completion(True) + completion.set_popup_completion(True) + completion.set_popup_single_match(True) + completion.set_model(list_store) + completion.set_text_column(1) + + entry_box.set_completion(completion) + + def qube_matching_function(completion: Gtk.EntryCompletion, + key: str, + iterator: Gtk.TreeIter, + user_data: object) -> bool: + # pylint: disable=unused-argument + modelstr = completion.get_model()[iterator][1] + return key.lower() in modelstr.lower() + + completion.set_match_func(qube_matching_function, None) + + if activation_trigger: + entry_box.connect( + "activate", + lambda entry: self._entry_activate( + activation_trigger, + destination_object, + entry, + vm_list, + ), + ) + + # A Combo with an entry has a text column already + text_column = destination_object.get_cells()[0] + destination_object.reorder(text_column, 1) + else: + entry_box = None + + text_column = Gtk.CellRendererText() + destination_object.pack_start(text_column, False) + destination_object.add_attribute(text_column, "text", 1) + + def changed_function(combo, self=self): + return self._combo_change( + selection_trigger, combo, entry_box, vm_list + ) + + destination_object.connect("changed", changed_function) + changed_function(destination_object) + + else: + raise TypeError( + "Only expecting Gtk.ComboBox objects to want our model." + ) + + def apply_icon(self, entry, qube_name): + if isinstance(entry, Gtk.Entry): + for key, vm_info in self._entries.items(): + if qube_name == vm_info['api_name']: + entry.set_icon_from_pixbuf( + Gtk.EntryIconPosition.PRIMARY, + self._entries[key]["icon"], + ) + break + else: + raise ValueError( + f"The following source qube does not exist: {qube_name}") + else: + raise TypeError( + "Only expecting Gtk.Entry objects to want our icon." + ) + + +class GtkOneTimerHelper: + # pylint: disable=too-few-public-methods + def __init__(self, wait_seconds): + self._wait_seconds = wait_seconds + self._current_timer_id = 0 + self._timer_completed = False + + def _invalidate_timer_completed(self): + self._timer_completed = False + + def _invalidate_current_timer(self): + self._current_timer_id += 1 + + def _timer_check_run(self, timer_id): + if self._current_timer_id == timer_id: + self._timer_run(timer_id) + self._timer_completed = True + else: + pass + + def _timer_run(self, timer_id): + raise NotImplementedError("Not yet implemented") + + def _timer_schedule(self): + self._invalidate_current_timer() + GLib.timeout_add( + int(round(self._wait_seconds * 1000)), + self._timer_check_run, + self._current_timer_id, + ) + + def _timer_has_completed(self): + return self._timer_completed + + +class FocusStealingHelper(GtkOneTimerHelper): + def __init__(self, window, target_button, wait_seconds=1): + GtkOneTimerHelper.__init__(self, wait_seconds) + self._window = window + self._target_button = target_button + + self._window.connect("window-state-event", self._window_state_event) + + self._target_sensitivity = False + self._target_button.set_sensitive(self._target_sensitivity) + + def _window_changed_focus(self, window_is_focused): + self._target_button.set_sensitive(False) + self._invalidate_timer_completed() + + if window_is_focused: + self._timer_schedule() + else: + self._invalidate_current_timer() + + def _window_state_event(self, window, event): + assert ( + window == self._window + ), "Window state callback called with wrong window" + + changed_focus = event.changed_mask & Gdk.WindowState.FOCUSED + window_focus = event.new_window_state & Gdk.WindowState.FOCUSED + + if changed_focus: + self._window_changed_focus(window_focus != 0) + + # Propagate event further + return False + + def _timer_run(self, timer_id): + self._target_button.set_sensitive(self._target_sensitivity) + + def request_sensitivity(self, sensitivity): + if self._timer_has_completed() or not sensitivity: + self._target_button.set_sensitive(sensitivity) + + self._target_sensitivity = sensitivity + + def can_perform_action(self): + return self._timer_has_completed() + + +class RPCConfirmationWindow: + # pylint: disable=too-few-public-methods,too-many-instance-attributes + _source_file_ref = importlib.resources.files("qubes").joinpath( + os.path.join( + "ext", "device_attach_confirm", "AttachConfirmationWindow.glade")) + + _source_id = { + "window": "AttachConfirmationWindow", + "ok": "okButton", + "cancel": "cancelButton", + "source": "sourceEntry", + "device_label": "deviceLabel", + "target": "TargetCombo", + "error_bar": "ErrorBar", + "error_message": "ErrorMessage", + } + + def _clicked_ok(self, source): + assert ( + source is not None + ), "Called the clicked ok callback from no source object" + + if self._can_perform_action(): + self._confirmed = True + self._close() + + def _clicked_cancel(self, button): + assert ( + button == self._rpc_cancel_button + ), "Called the clicked cancel callback through the wrong button" + + if self._can_perform_action(): + self._confirmed = False + self._close() + + def _key_pressed(self, window, key): + assert ( + window == self._rpc_window + ), "Key pressed callback called with wrong window" + + if self._can_perform_action(): + if key.keyval == Gdk.KEY_Escape: + self._confirmed = False + self._close() + + def _update_ok_button_sensitivity(self, data): + valid = data is not None + + if valid: + self._target_name = data + else: + self._target_name = None + + self._focus_helper.request_sensitivity(valid) + + def _show_error(self, error_message): + self._error_message.set_text(error_message) + self._error_bar.set_visible(True) + + def _close_error(self, error_bar, response): + assert ( + error_bar == self._error_bar + ), "Closed the error bar with the wrong error bar as parameter" + assert ( + response is not None + ), "Closed the error bar with None as a response" + + self._error_bar.set_visible(False) + + def _set_initial_target(self, source, target): + if target is not None: + if target == source: + self._show_error( + "Source and target domains must not be the same." + ) + else: + model = self._rpc_combo_box.get_model() + + found = False + for item in model: + if item[3] == target: + found = True + + self._rpc_combo_box.set_active_iter( + model.get_iter(item.path) + ) + + break + + if not found: + self._show_error("Domain '%s' doesn't exist." % target) + + def _can_perform_action(self): + return self._focus_helper.can_perform_action() + + def _connect_events(self): + self._rpc_window.connect("key-press-event", self._key_pressed) + self._rpc_ok_button.connect("clicked", self._clicked_ok) + self._rpc_cancel_button.connect("clicked", self._clicked_cancel) + + self._error_bar.connect("response", self._close_error) + + def __init__( + self, entries_info, source, device_name, argument, targets_list, target=None + ): + # pylint: disable=too-many-arguments + # sanitize_domain_name(source, assert_sanitized=True) TODO + # sanitize_service_name(source, assert_sanitized=True) TODO + + self._gtk_builder = Gtk.Builder() + with importlib.resources.as_file(self._source_file_ref) as path: + self._gtk_builder.add_from_file(str(path)) + self._rpc_window = self._gtk_builder.get_object( + self._source_id["window"] + ) + self._rpc_ok_button = self._gtk_builder.get_object( + self._source_id["ok"] + ) + self._rpc_cancel_button = self._gtk_builder.get_object( + self._source_id["cancel"] + ) + self._device_label = self._gtk_builder.get_object( + self._source_id["device_label"] + ) + self._source_entry = self._gtk_builder.get_object( + self._source_id["source"] + ) + self._rpc_combo_box = self._gtk_builder.get_object( + self._source_id["target"] + ) + self._error_bar = self._gtk_builder.get_object( + self._source_id["error_bar"] + ) + self._error_message = self._gtk_builder.get_object( + self._source_id["error_message"] + ) + self._target_name = None + + self._focus_helper = self._new_focus_stealing_helper() + + self._device_label.set_markup(device_name) + + self._entries_info = entries_info + + options = {name: " " + options for vm_data in targets_list + for name, _, options in (vm_data.partition(" "),)} + list_modeler = self._new_vm_list_modeler(options) + + list_modeler.apply_model( + self._rpc_combo_box, + options.keys(), + selection_trigger=self._update_ok_button_sensitivity, + activation_trigger=self._clicked_ok, + ) + + self._source_entry.set_text(source + ":" + argument) + list_modeler.apply_icon(self._source_entry, source) + + self._confirmed = None + + self._set_initial_target(source, target) + + self._connect_events() + + def _close(self): + self._rpc_window.close() + + async def _wait_for_close(self): + await gbulb.wait_signal(self._rpc_window, "delete-event") + + def _show(self): + self._rpc_window.set_keep_above(True) + self._rpc_window.show_all() + + def _new_vm_list_modeler(self, options): + return VMListModeler(options, self._entries_info) + + def _new_focus_stealing_helper(self): + return FocusStealingHelper(self._rpc_window, self._rpc_ok_button, 1) + + async def confirm_rpc(self): + self._show() + await self._wait_for_close() + + if self._confirmed: + return self._target_name + return False + + +async def confirm_attachment( + entries_info, source, device_name, argument, targets_list, target=None +): + # pylint: disable=too-many-arguments + window = RPCConfirmationWindow( + entries_info, source, device_name, argument, targets_list, target + ) + + return await window.confirm_rpc() + + +def escape_and_format_rpc_text(service, argument=""): + service = GLib.markup_escape_text(service) + argument = GLib.markup_escape_text(argument) + + domain, dot, name = service.partition(".") + if dot and name: + result = f"{domain}.{name}" + else: + result = f"{service}" + + if argument != "+": + result += argument + + return result + + +class SocketService: + def __init__(self, socket_path): + self._socket_path = socket_path + + async def run(self): + server = await self.start() + async with server: + await server.serve_forever() + + async def start(self): + if os.path.exists(self._socket_path): + os.unlink(self._socket_path) + return await asyncio.start_unix_server( + self._client_connected, path=self._socket_path + ) + + async def _client_connected(self, reader, writer): + try: + data = await reader.read() + data = data.decode("ascii") + assert "\0" in data, data + header, json_data = data.split("\0", 1) + + # Note that we process only the first two parts (service and + # source_domain) and disregard the second two parts (target + # specification) that appear when we're running in dom0. + header_parts = header.split(" ") + assert len(header_parts) >= 2, header + service = header_parts[0] + source_domain = header_parts[1] + + params = json.loads(json_data) + + response = await self.handle_request(params, service, source_domain) + + writer.write(response.encode("ascii")) + await writer.drain() + finally: + writer.close() + await writer.wait_closed() + + async def handle_request(self, params, service, source_domain): + raise NotImplementedError() + + +class PolicyAgent(SocketService): + def __init__(self, *args, **kwargs): + super().__init__(*args, **kwargs) + self._app = Gtk.Application() + self._app.set_application_id("qubes.qrexec-policy-agent") + self._app.register() + + async def handle_request(self, params, service, source_domain): + if service == "device-agent.GUI": + return await self.handle_ask(params) + raise ValueError("unknown service name: {}".format(service)) + + @staticmethod + async def handle_ask(params): + source = params["source"] + device_name = params["device_name"] + argument = params["argument"] + targets = params["targets"] + default_target = params["default_target"] + + entries_info = {} + for domain_name, icon in params["icons"].items(): + entries_info[domain_name] = {"icon": icon} + + target = await confirm_attachment( + entries_info, + source, + device_name, + argument, + targets, + default_target or None, + ) + + if target: + return f"allow:{target}" + return "deny" + + async def handle_notify(self, params): + resolution = params["resolution"] + service = params["service"] + argument = params["argument"] + source = params["source"] + target = params["target"] + + assert resolution in ["allow", "deny", "fail"], resolution + + self.notify(resolution, service, argument, source, target) + return "" + + def notify(self, resolution, service, argument, source, target): + # pylint: disable=too-many-arguments + if argument == "+": + rpc = service + else: + rpc = service + argument + + if resolution == "allow": + app_icon = None + summary = "Allowed: {service}" + body = ( + "Allowed {rpc} " + "from {source} to {target}" + ) + elif resolution == "deny": + app_icon = "dialog-error" + summary = "Denied: {service}" + body = ( + "Denied {rpc} from {source} to {target}" + ) + elif resolution == "fail": + app_icon = "dialog-warning" + summary = "Failed: {service}" + body = ( + "Failed to execute {rpc} " + "(from {source} to {target})" + ) + else: + assert False, resolution + + # summary is plain text, body is markup + summary = summary.format(service=service) + body = body.format( + rpc=GLib.markup_escape_text(rpc), + source=GLib.markup_escape_text(source), + target=GLib.markup_escape_text(target), + ) + + notification = Gio.Notification.new(summary) + notification.set_priority(Gio.NotificationPriority.NORMAL) + notification.set_body(body) + if app_icon: + icon = Gio.ThemedIcon.new(app_icon) + notification.set_icon(icon) + + self._app.send_notification(None, notification) + + +parser = argparse.ArgumentParser() + +parser.add_argument( + "-s", + "--socket-path", + metavar="DIR", + type=str, + default=DEVICE_AGENT_SOCKET_PATH, + help="path to socket", +) + + +def main(): + args = parser.parse_args() + + gbulb.install() + agent = PolicyAgent(args.socket_path) + + asyncio.run(agent.run()) + + +if __name__ == "__main__": + main() diff --git a/rpm_spec/qubes-desktop-linux-manager.spec.in b/rpm_spec/qubes-desktop-linux-manager.spec.in index 8ab30de2..54f8d886 100644 --- a/rpm_spec/qubes-desktop-linux-manager.spec.in +++ b/rpm_spec/qubes-desktop-linux-manager.spec.in @@ -92,11 +92,14 @@ fi gtk-update-icon-cache %{_datadir}/icons/Adwaita &>/dev/null || : %files +/usr/bin/attach-confirm %defattr(-,root,root,-) %dir %{python3_sitelib}/qui-*.egg-info %{python3_sitelib}/qui-*.egg-info/* +/usr/lib/qubes/qubes-device-agent-autostart + %dir %{python3_sitelib}/qui %dir %{python3_sitelib}/qui/__pycache__ %{python3_sitelib}/qui/__pycache__/* @@ -137,6 +140,14 @@ gtk-update-icon-cache %{_datadir}/icons/Adwaita &>/dev/null || : %{python3_sitelib}/qui/devices/device_widget.py %{python3_sitelib}/qui/qubes-devices-dark.css %{python3_sitelib}/qui/qubes-devices-light.css +%{python3_sitelib}/qui/devices/AttachConfirmationWindow.glade + +%dir %{python3_sitelib}/qui/tools +%dir %{python3_sitelib}/qui/tools/__pycache__ +%{python3_sitelib}/qui/tools/__pycache__/* +%{python3_sitelib}/qui/tools/__init__.py +%{python3_sitelib}/qui/tools/attach_confirm.py +%{python3_sitelib}/qui/tools/qubes_device_agent.py %dir %{python3_sitelib}/qui/tray/ %dir %{python3_sitelib}/qui/tray/__pycache__ @@ -202,6 +213,7 @@ gtk-update-icon-cache %{_datadir}/icons/Adwaita &>/dev/null || : %{_bindir}/qui-updates %{_bindir}/qui-clipboard %{_bindir}/qubes-update-gui +/etc/xdg/autostart/qubes-device-agent.desktop /etc/xdg/autostart/qui-domains.desktop /etc/xdg/autostart/qui-devices.desktop /etc/xdg/autostart/qui-clipboard.desktop diff --git a/setup.py b/setup.py index cee4703f..f7d299b3 100644 --- a/setup.py +++ b/setup.py @@ -32,8 +32,38 @@ def create_mo_files(self): def run(self): self.create_mo_files() + self.install_scripts() super().run() + # create simple scripts that run much faster than "console entry points" + def install_scripts(self): + bin = os.path.join(self.root, "usr/bin") + try: + os.makedirs(bin) + except: + pass + for file, pkg in get_console_scripts(): + path = os.path.join(bin, file) + with open(path, "w") as f: + f.write( +"""#!/usr/bin/python3 +from {} import main +import sys +if __name__ == '__main__': + sys.exit(main()) +""".format(pkg)) + + os.chmod(path, 0o755) + +# don't import: import * is unreliable and there is no need, since this is +# compile time and we have source files +def get_console_scripts(): + for filename in os.listdir('./qui/tools'): + basename, ext = os.path.splitext(os.path.basename(filename)) + if basename == '__init__' or ext != '.py': + continue + yield basename.replace('_', '-'), 'qui.tools.{}'.format(basename) + setuptools.setup( name='qui', @@ -69,7 +99,8 @@ def run(self): "styles/qubes-widgets-base.css", "eol.json", "qubes-devices-light.css", - "qubes-devices-dark.css" + "qubes-devices-dark.css", + "devices/AttachConfirmationWindow.glade" ], 'qubes_config': ["new_qube.glade", "global_config.glade", From 068c9d06cd1b8125d4d5fdd6498daf8e894c5e23 Mon Sep 17 00:00:00 2001 From: Piotr Bartman-Szwarc Date: Tue, 20 Aug 2024 09:37:44 +0200 Subject: [PATCH 06/19] q-dev: fix setup.py --- setup.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/setup.py b/setup.py index f7d299b3..c85e631b 100644 --- a/setup.py +++ b/setup.py @@ -32,11 +32,11 @@ def create_mo_files(self): def run(self): self.create_mo_files() - self.install_scripts() + self.install_custom_scripts() super().run() # create simple scripts that run much faster than "console entry points" - def install_scripts(self): + def install_custom_scripts(self): bin = os.path.join(self.root, "usr/bin") try: os.makedirs(bin) From 9691605df63d1adbae4ed024fac48355e23c51f7 Mon Sep 17 00:00:00 2001 From: Piotr Bartman-Szwarc Date: Tue, 20 Aug 2024 09:53:15 +0200 Subject: [PATCH 07/19] q-dev: include tools --- rpm_spec/qubes-desktop-linux-manager.spec.in | 1 + setup.py | 7 ++++--- 2 files changed, 5 insertions(+), 3 deletions(-) diff --git a/rpm_spec/qubes-desktop-linux-manager.spec.in b/rpm_spec/qubes-desktop-linux-manager.spec.in index 54f8d886..d671f140 100644 --- a/rpm_spec/qubes-desktop-linux-manager.spec.in +++ b/rpm_spec/qubes-desktop-linux-manager.spec.in @@ -93,6 +93,7 @@ gtk-update-icon-cache %{_datadir}/icons/Adwaita &>/dev/null || : %files /usr/bin/attach-confirm +/usr/bin/qubes-device-agent %defattr(-,root,root,-) %dir %{python3_sitelib}/qui-*.egg-info diff --git a/setup.py b/setup.py index c85e631b..09104331 100644 --- a/setup.py +++ b/setup.py @@ -73,9 +73,10 @@ def get_console_scripts(): description='Qubes User Interface And Configuration Package', license='GPL2+', url='https://www.qubes-os.org/', - packages=["qui", "qui.updater", "qui.devices", "qui.tray", "qubes_config", - "qubes_config.global_config", "qubes_config.widgets", - "qubes_config.new_qube", 'qubes_config.policy_editor'], + packages=["qui", "qui.updater", "qui.devices", "qui.tools", "qui.tray", + "qubes_config", "qubes_config.global_config", + "qubes_config.widgets", "qubes_config.new_qube", + 'qubes_config.policy_editor'], entry_points={ 'gui_scripts': [ 'qui-domains = qui.tray.domains:main', From 30f7401a3333453d3458399824ed89d958dff8fb Mon Sep 17 00:00:00 2001 From: Piotr Bartman-Szwarc Date: Tue, 20 Aug 2024 11:37:44 +0200 Subject: [PATCH 08/19] q-dev: update glade path --- qui/tools/qubes_device_agent.py | 5 ++--- 1 file changed, 2 insertions(+), 3 deletions(-) diff --git a/qui/tools/qubes_device_agent.py b/qui/tools/qubes_device_agent.py index ede88061..0b74d31a 100644 --- a/qui/tools/qubes_device_agent.py +++ b/qui/tools/qubes_device_agent.py @@ -333,9 +333,8 @@ def can_perform_action(self): class RPCConfirmationWindow: # pylint: disable=too-few-public-methods,too-many-instance-attributes - _source_file_ref = importlib.resources.files("qubes").joinpath( - os.path.join( - "ext", "device_attach_confirm", "AttachConfirmationWindow.glade")) + _source_file_ref = importlib.resources.files("qui").joinpath( + os.path.join("devices", "AttachConfirmationWindow.glade")) _source_id = { "window": "AttachConfirmationWindow", From ac9e490aa62bce3d065b52c0079354f269ef199f Mon Sep 17 00:00:00 2001 From: Piotr Bartman-Szwarc Date: Mon, 26 Aug 2024 22:25:36 +0200 Subject: [PATCH 09/19] q-dev: simplification of device agent --- qui/tools/attach_confirm.py | 40 +-- qui/tools/qubes_device_agent.py | 516 ++------------------------------ 2 files changed, 26 insertions(+), 530 deletions(-) diff --git a/qui/tools/attach_confirm.py b/qui/tools/attach_confirm.py index 65017758..c3551256 100755 --- a/qui/tools/attach_confirm.py +++ b/qui/tools/attach_confirm.py @@ -18,52 +18,16 @@ # along with this program; if not, write to the Free Software # Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, # USA. -import os import sys -import json import asyncio from qubes import Qubes +from qrexec.server import call_socket_service SOCKET_PATH = "/var/run/qubes" -def call_socket_service( - remote_domain, service, source_domain, params, socket_path=SOCKET_PATH -): - """ - Call a socket service, either over qrexec or locally. - - The request is JSON-encoded, response is plain ASCII text. - """ - - if remote_domain == source_domain: - return call_socket_service_local( - service, source_domain, params, socket_path - ) - raise NotImplementedError() - # return call_socket_service_remote(remote_domain, service, params) - - -async def call_socket_service_local( - service, source_domain, params, socket_path=SOCKET_PATH -): - if source_domain == "dom0": - header = f"{service} dom0 name dom0\0".encode("ascii") - else: - header = f"{service} {source_domain}\0".encode("ascii") - - path = os.path.join(socket_path, service) - reader, writer = await asyncio.open_unix_connection(path) - writer.write(header) - writer.write(json.dumps(params).encode("ascii")) - writer.write_eof() - await writer.drain() - response = await reader.read() - return response.decode("ascii") - - def main(): socket = "device-agent.GUI" @@ -86,7 +50,7 @@ def main(): } ask_response = asyncio.run(call_socket_service( - guivm, socket, "dom0", params + guivm, socket, "dom0", params, SOCKET_PATH )) if ask_response.startswith("allow:"): diff --git a/qui/tools/qubes_device_agent.py b/qui/tools/qubes_device_agent.py index 0b74d31a..ec011655 100644 --- a/qui/tools/qubes_device_agent.py +++ b/qui/tools/qubes_device_agent.py @@ -21,54 +21,40 @@ """ Agent running in user session, responsible for asking the user about device attachment.""" -import itertools import os import argparse import asyncio -import json import importlib.resources # pylint: disable=import-error,wrong-import-position import gi +from qrexec.server import SocketService +from qrexec.utils import sanitize_domain_name +from qubesadmin.device_protocol import DeviceSerializer + gi.require_version("Gtk", "3.0") -from gi.repository import Gtk, Gdk, GdkPixbuf, GLib, Gio +from gi.repository import Gtk, Gdk +from qrexec.tools.qrexec_policy_agent import ( + VMListModeler, RPCConfirmationWindow) # pylint: enable=import-error # pylint: disable=wrong-import-order import gbulb -# from qrexec.utils import sanitize_domain_name -# from ..server import SocketService - # pylint: enable=wrong-import-position DEVICE_AGENT_SOCKET_PATH = "/var/run/qubes/device-agent.GUI" -class VMListModeler: +class VMAndPortListModeler(VMListModeler): def __init__(self, options, domains_info=None): - self._entries = {} - self._domains_info = domains_info - self._icons = {} - self._icon_size = 16 - self._theme = Gtk.IconTheme.get_default() - self._create_entries(options) - - def _get_icon(self, name): - if name not in self._icons: - try: - icon = self._theme.load_icon(name, self._icon_size, 0) - except GLib.Error: # pylint: disable=catching-non-exception - icon = self._theme.load_icon("edit-find", self._icon_size, 0) - - self._icons[name] = icon + super().__init__(domains_info) + self._override_entries(options) - return self._icons[name] - - def _create_entries(self, options): + def _override_entries(self, options): for name, vm in self._domains_info.items(): if name.startswith("@dispvm:"): vm_name = name[len("@dispvm:"):] @@ -76,7 +62,7 @@ def _create_entries(self, options): else: vm_name = name prefix = "" - # sanitize_domain_name(vm_name, assert_sanitized=True) TODO + sanitize_domain_name(vm_name, assert_sanitized=True) icon = self._get_icon(vm.get("icon", None)) @@ -87,150 +73,6 @@ def _create_entries(self, options): "vm": vm, } - def _get_valid_qube_name(self, combo, entry_box, whitelist): - name = None - - if combo and combo.get_active_id(): - selected = combo.get_active_id() - - if ( - selected in self._entries - and self._entries[selected]["api_name"] in whitelist - ): - name = selected - - if not name and entry_box: - typed = entry_box.get_text() - - if ( - typed in self._entries - and self._entries[typed]["api_name"] in whitelist - ): - name = typed - - return name - - def _combo_change(self, selection_trigger, combo, entry_box, whitelist): - data = None - name = self._get_valid_qube_name(combo, entry_box, whitelist) - - if name: - entry = self._entries[name] - - data = entry["api_name"] - - if entry_box: - entry_box.set_icon_from_pixbuf( - Gtk.EntryIconPosition.PRIMARY, entry["icon"] - ) - else: - if entry_box: - entry_box.set_icon_from_stock( - Gtk.EntryIconPosition.PRIMARY, "gtk-find" - ) - - if selection_trigger: - selection_trigger(data) - - def _entry_activate(self, activation_trigger, combo, entry_box, whitelist): - name = self._get_valid_qube_name(combo, entry_box, whitelist) - - if name: - activation_trigger(entry_box) - - def apply_model( - self, - destination_object, - vm_list, - selection_trigger=None, - activation_trigger=None, - ): - if isinstance(destination_object, Gtk.ComboBox): - list_store = Gtk.ListStore(int, str, GdkPixbuf.Pixbuf, str) - - for entry_no, display_name in zip( - itertools.count(), sorted(self._entries) - ): - entry = self._entries[display_name] - if entry["api_name"] in vm_list: - list_store.append( - [ - entry_no, - display_name, - entry["icon"], - entry["api_name"], - ] - ) - - destination_object.set_model(list_store) - destination_object.set_id_column(1) - - icon_column = Gtk.CellRendererPixbuf() - destination_object.pack_start(icon_column, False) - destination_object.add_attribute(icon_column, "pixbuf", 2) - destination_object.set_entry_text_column(1) - - if destination_object.get_has_entry(): - entry_box = destination_object.get_child() - - area = Gtk.CellAreaBox() - area.pack_start(icon_column, False, False, False) - area.add_attribute(icon_column, "pixbuf", 2) - - completion = Gtk.EntryCompletion.new_with_area(area) - completion.set_inline_selection(True) - completion.set_inline_completion(True) - completion.set_popup_completion(True) - completion.set_popup_single_match(True) - completion.set_model(list_store) - completion.set_text_column(1) - - entry_box.set_completion(completion) - - def qube_matching_function(completion: Gtk.EntryCompletion, - key: str, - iterator: Gtk.TreeIter, - user_data: object) -> bool: - # pylint: disable=unused-argument - modelstr = completion.get_model()[iterator][1] - return key.lower() in modelstr.lower() - - completion.set_match_func(qube_matching_function, None) - - if activation_trigger: - entry_box.connect( - "activate", - lambda entry: self._entry_activate( - activation_trigger, - destination_object, - entry, - vm_list, - ), - ) - - # A Combo with an entry has a text column already - text_column = destination_object.get_cells()[0] - destination_object.reorder(text_column, 1) - else: - entry_box = None - - text_column = Gtk.CellRendererText() - destination_object.pack_start(text_column, False) - destination_object.add_attribute(text_column, "text", 1) - - def changed_function(combo, self=self): - return self._combo_change( - selection_trigger, combo, entry_box, vm_list - ) - - destination_object.connect("changed", changed_function) - changed_function(destination_object) - - else: - raise TypeError( - "Only expecting Gtk.ComboBox objects to want our model." - ) - def apply_icon(self, entry, qube_name): if isinstance(entry, Gtk.Entry): for key, vm_info in self._entries.items(): @@ -249,89 +91,7 @@ def apply_icon(self, entry, qube_name): ) -class GtkOneTimerHelper: - # pylint: disable=too-few-public-methods - def __init__(self, wait_seconds): - self._wait_seconds = wait_seconds - self._current_timer_id = 0 - self._timer_completed = False - - def _invalidate_timer_completed(self): - self._timer_completed = False - - def _invalidate_current_timer(self): - self._current_timer_id += 1 - - def _timer_check_run(self, timer_id): - if self._current_timer_id == timer_id: - self._timer_run(timer_id) - self._timer_completed = True - else: - pass - - def _timer_run(self, timer_id): - raise NotImplementedError("Not yet implemented") - - def _timer_schedule(self): - self._invalidate_current_timer() - GLib.timeout_add( - int(round(self._wait_seconds * 1000)), - self._timer_check_run, - self._current_timer_id, - ) - - def _timer_has_completed(self): - return self._timer_completed - - -class FocusStealingHelper(GtkOneTimerHelper): - def __init__(self, window, target_button, wait_seconds=1): - GtkOneTimerHelper.__init__(self, wait_seconds) - self._window = window - self._target_button = target_button - - self._window.connect("window-state-event", self._window_state_event) - - self._target_sensitivity = False - self._target_button.set_sensitive(self._target_sensitivity) - - def _window_changed_focus(self, window_is_focused): - self._target_button.set_sensitive(False) - self._invalidate_timer_completed() - - if window_is_focused: - self._timer_schedule() - else: - self._invalidate_current_timer() - - def _window_state_event(self, window, event): - assert ( - window == self._window - ), "Window state callback called with wrong window" - - changed_focus = event.changed_mask & Gdk.WindowState.FOCUSED - window_focus = event.new_window_state & Gdk.WindowState.FOCUSED - - if changed_focus: - self._window_changed_focus(window_focus != 0) - - # Propagate event further - return False - - def _timer_run(self, timer_id): - self._target_button.set_sensitive(self._target_sensitivity) - - def request_sensitivity(self, sensitivity): - if self._timer_has_completed() or not sensitivity: - self._target_button.set_sensitive(sensitivity) - - self._target_sensitivity = sensitivity - - def can_perform_action(self): - return self._timer_has_completed() - - -class RPCConfirmationWindow: +class AttachmentConfirmationWindow(RPCConfirmationWindow): # pylint: disable=too-few-public-methods,too-many-instance-attributes _source_file_ref = importlib.resources.files("qui").joinpath( os.path.join("devices", "AttachConfirmationWindow.glade")) @@ -347,97 +107,14 @@ class RPCConfirmationWindow: "error_message": "ErrorMessage", } - def _clicked_ok(self, source): - assert ( - source is not None - ), "Called the clicked ok callback from no source object" - - if self._can_perform_action(): - self._confirmed = True - self._close() - - def _clicked_cancel(self, button): - assert ( - button == self._rpc_cancel_button - ), "Called the clicked cancel callback through the wrong button" - - if self._can_perform_action(): - self._confirmed = False - self._close() - - def _key_pressed(self, window, key): - assert ( - window == self._rpc_window - ), "Key pressed callback called with wrong window" - - if self._can_perform_action(): - if key.keyval == Gdk.KEY_Escape: - self._confirmed = False - self._close() - - def _update_ok_button_sensitivity(self, data): - valid = data is not None - - if valid: - self._target_name = data - else: - self._target_name = None - - self._focus_helper.request_sensitivity(valid) - - def _show_error(self, error_message): - self._error_message.set_text(error_message) - self._error_bar.set_visible(True) - - def _close_error(self, error_bar, response): - assert ( - error_bar == self._error_bar - ), "Closed the error bar with the wrong error bar as parameter" - assert ( - response is not None - ), "Closed the error bar with None as a response" - - self._error_bar.set_visible(False) - - def _set_initial_target(self, source, target): - if target is not None: - if target == source: - self._show_error( - "Source and target domains must not be the same." - ) - else: - model = self._rpc_combo_box.get_model() - - found = False - for item in model: - if item[3] == target: - found = True - - self._rpc_combo_box.set_active_iter( - model.get_iter(item.path) - ) - - break - - if not found: - self._show_error("Domain '%s' doesn't exist." % target) - - def _can_perform_action(self): - return self._focus_helper.can_perform_action() - - def _connect_events(self): - self._rpc_window.connect("key-press-event", self._key_pressed) - self._rpc_ok_button.connect("clicked", self._clicked_ok) - self._rpc_cancel_button.connect("clicked", self._clicked_cancel) - - self._error_bar.connect("response", self._close_error) - def __init__( self, entries_info, source, device_name, argument, targets_list, target=None ): # pylint: disable=too-many-arguments - # sanitize_domain_name(source, assert_sanitized=True) TODO - # sanitize_service_name(source, assert_sanitized=True) TODO + sanitize_domain_name(source, assert_sanitized=True) + DeviceSerializer.sanitize_str( + device_name, DeviceSerializer.ALLOWED_CHARS_PARAM, + error_message="Invalid device name") self._gtk_builder = Gtk.Builder() with importlib.resources.as_file(self._source_file_ref) as path: @@ -494,117 +171,31 @@ def __init__( self._connect_events() - def _close(self): - self._rpc_window.close() - - async def _wait_for_close(self): - await gbulb.wait_signal(self._rpc_window, "delete-event") - - def _show(self): - self._rpc_window.set_keep_above(True) - self._rpc_window.show_all() - def _new_vm_list_modeler(self, options): - return VMListModeler(options, self._entries_info) - - def _new_focus_stealing_helper(self): - return FocusStealingHelper(self._rpc_window, self._rpc_ok_button, 1) - - async def confirm_rpc(self): - self._show() - await self._wait_for_close() - - if self._confirmed: - return self._target_name - return False + return VMAndPortListModeler(options, self._entries_info) async def confirm_attachment( entries_info, source, device_name, argument, targets_list, target=None ): # pylint: disable=too-many-arguments - window = RPCConfirmationWindow( + window = AttachmentConfirmationWindow( entries_info, source, device_name, argument, targets_list, target ) return await window.confirm_rpc() -def escape_and_format_rpc_text(service, argument=""): - service = GLib.markup_escape_text(service) - argument = GLib.markup_escape_text(argument) - - domain, dot, name = service.partition(".") - if dot and name: - result = f"{domain}.{name}" - else: - result = f"{service}" - - if argument != "+": - result += argument - - return result - - -class SocketService: - def __init__(self, socket_path): - self._socket_path = socket_path - - async def run(self): - server = await self.start() - async with server: - await server.serve_forever() - - async def start(self): - if os.path.exists(self._socket_path): - os.unlink(self._socket_path) - return await asyncio.start_unix_server( - self._client_connected, path=self._socket_path - ) - - async def _client_connected(self, reader, writer): - try: - data = await reader.read() - data = data.decode("ascii") - assert "\0" in data, data - header, json_data = data.split("\0", 1) - - # Note that we process only the first two parts (service and - # source_domain) and disregard the second two parts (target - # specification) that appear when we're running in dom0. - header_parts = header.split(" ") - assert len(header_parts) >= 2, header - service = header_parts[0] - source_domain = header_parts[1] - - params = json.loads(json_data) - - response = await self.handle_request(params, service, source_domain) - - writer.write(response.encode("ascii")) - await writer.drain() - finally: - writer.close() - await writer.wait_closed() - - async def handle_request(self, params, service, source_domain): - raise NotImplementedError() - - -class PolicyAgent(SocketService): +class DeviceAgent(SocketService): def __init__(self, *args, **kwargs): super().__init__(*args, **kwargs) self._app = Gtk.Application() - self._app.set_application_id("qubes.qrexec-policy-agent") + self._app.set_application_id("qubes.device-agent") self._app.register() async def handle_request(self, params, service, source_domain): - if service == "device-agent.GUI": - return await self.handle_ask(params) - raise ValueError("unknown service name: {}".format(service)) - - @staticmethod - async def handle_ask(params): + if service != "device-agent.GUI": + raise ValueError("unknown service name: {}".format(service)) source = params["source"] device_name = params["device_name"] argument = params["argument"] @@ -628,65 +219,6 @@ async def handle_ask(params): return f"allow:{target}" return "deny" - async def handle_notify(self, params): - resolution = params["resolution"] - service = params["service"] - argument = params["argument"] - source = params["source"] - target = params["target"] - - assert resolution in ["allow", "deny", "fail"], resolution - - self.notify(resolution, service, argument, source, target) - return "" - - def notify(self, resolution, service, argument, source, target): - # pylint: disable=too-many-arguments - if argument == "+": - rpc = service - else: - rpc = service + argument - - if resolution == "allow": - app_icon = None - summary = "Allowed: {service}" - body = ( - "Allowed {rpc} " - "from {source} to {target}" - ) - elif resolution == "deny": - app_icon = "dialog-error" - summary = "Denied: {service}" - body = ( - "Denied {rpc} from {source} to {target}" - ) - elif resolution == "fail": - app_icon = "dialog-warning" - summary = "Failed: {service}" - body = ( - "Failed to execute {rpc} " - "(from {source} to {target})" - ) - else: - assert False, resolution - - # summary is plain text, body is markup - summary = summary.format(service=service) - body = body.format( - rpc=GLib.markup_escape_text(rpc), - source=GLib.markup_escape_text(source), - target=GLib.markup_escape_text(target), - ) - - notification = Gio.Notification.new(summary) - notification.set_priority(Gio.NotificationPriority.NORMAL) - notification.set_body(body) - if app_icon: - icon = Gio.ThemedIcon.new(app_icon) - notification.set_icon(icon) - - self._app.send_notification(None, notification) - parser = argparse.ArgumentParser() @@ -704,7 +236,7 @@ def main(): args = parser.parse_args() gbulb.install() - agent = PolicyAgent(args.socket_path) + agent = DeviceAgent(args.socket_path) asyncio.run(agent.run()) From 12dba6c933da0858352af7b8424d602631635da7 Mon Sep 17 00:00:00 2001 From: Piotr Bartman-Szwarc Date: Tue, 27 Aug 2024 00:42:12 +0200 Subject: [PATCH 10/19] q-dev: cleanup --- qui/tools/attach_confirm.py | 4 ++-- qui/tools/qubes_device_agent.py | 30 +++++++++++++++++------------- 2 files changed, 19 insertions(+), 15 deletions(-) diff --git a/qui/tools/attach_confirm.py b/qui/tools/attach_confirm.py index c3551256..2c8aad30 100755 --- a/qui/tools/attach_confirm.py +++ b/qui/tools/attach_confirm.py @@ -55,9 +55,9 @@ def main(): if ask_response.startswith("allow:"): print(ask_response[len("allow:"):], end="") - exit(0) + sys.exit(0) else: - exit(1) + sys.exit(1) if __name__ == "__main__": diff --git a/qui/tools/qubes_device_agent.py b/qui/tools/qubes_device_agent.py index ec011655..37a5a3b2 100644 --- a/qui/tools/qubes_device_agent.py +++ b/qui/tools/qubes_device_agent.py @@ -30,14 +30,8 @@ # pylint: disable=import-error,wrong-import-position import gi -from qrexec.server import SocketService -from qrexec.utils import sanitize_domain_name -from qubesadmin.device_protocol import DeviceSerializer - gi.require_version("Gtk", "3.0") -from gi.repository import Gtk, Gdk -from qrexec.tools.qrexec_policy_agent import ( - VMListModeler, RPCConfirmationWindow) +from gi.repository import Gtk # pylint: enable=import-error @@ -46,6 +40,13 @@ # pylint: enable=wrong-import-position +from qrexec.server import SocketService +from qrexec.utils import sanitize_domain_name +from qrexec.tools.qrexec_policy_agent import ( + VMListModeler, RPCConfirmationWindow) +from qubesadmin.device_protocol import DeviceSerializer + + DEVICE_AGENT_SOCKET_PATH = "/var/run/qubes/device-agent.GUI" @@ -75,11 +76,10 @@ def _override_entries(self, options): def apply_icon(self, entry, qube_name): if isinstance(entry, Gtk.Entry): - for key, vm_info in self._entries.items(): + for vm_info in self._entries.values(): if qube_name == vm_info['api_name']: entry.set_icon_from_pixbuf( - Gtk.EntryIconPosition.PRIMARY, - self._entries[key]["icon"], + Gtk.EntryIconPosition.PRIMARY, vm_info["icon"], ) break else: @@ -107,8 +107,12 @@ class AttachmentConfirmationWindow(RPCConfirmationWindow): "error_message": "ErrorMessage", } + # We reuse most parts of superclass, but we need custom init, + # so we DO NOT call super().__init__() + # pylint: disable=super-init-not-called def __init__( - self, entries_info, source, device_name, argument, targets_list, target=None + self, + entries_info, source, device_name, argument, targets_list, target=None ): # pylint: disable=too-many-arguments sanitize_domain_name(source, assert_sanitized=True) @@ -153,7 +157,7 @@ def __init__( options = {name: " " + options for vm_data in targets_list for name, _, options in (vm_data.partition(" "),)} - list_modeler = self._new_vm_list_modeler(options) + list_modeler = self._new_vm_list_modeler_overridden(options) list_modeler.apply_model( self._rpc_combo_box, @@ -171,7 +175,7 @@ def __init__( self._connect_events() - def _new_vm_list_modeler(self, options): + def _new_vm_list_modeler_overridden(self, options): return VMAndPortListModeler(options, self._entries_info) From 9a3cc1572920edf9ed38f24d5092eaa7b6cf4751 Mon Sep 17 00:00:00 2001 From: Piotr Bartman-Szwarc Date: Tue, 27 Aug 2024 14:33:10 +0200 Subject: [PATCH 11/19] q-dev: remove guivm from args of attach-confirm --- qui/tools/attach_confirm.py | 26 +++++++++++++++++--------- qui/tools/qubes_device_agent.py | 2 ++ 2 files changed, 19 insertions(+), 9 deletions(-) diff --git a/qui/tools/attach_confirm.py b/qui/tools/attach_confirm.py index 2c8aad30..02db6de4 100755 --- a/qui/tools/attach_confirm.py +++ b/qui/tools/attach_confirm.py @@ -21,7 +21,7 @@ import sys import asyncio -from qubes import Qubes +import qubes from qrexec.server import call_socket_service @@ -31,17 +31,25 @@ def main(): socket = "device-agent.GUI" - guivm = sys.argv[1] + app = qubes.Qubes() + system_info = qubes.api.internal.get_system_info(app) + doms = app.domains - number_of_targets = len(sys.argv) - 5 - doms = Qubes().domains + try: + guivm = system_info["domains"]["dom0"]["guivm"] + except KeyError: + guivm = "dom0" + if guivm is None: + guivm = "dom0" + + number_of_targets = len(sys.argv) - 4 params = { - "source": sys.argv[2], - "device_name": sys.argv[4], - "argument": sys.argv[3], - "targets": sys.argv[5:], - "default_target": sys.argv[5] if number_of_targets == 1 else "", + "source": sys.argv[1], + "device_name": sys.argv[3], + "argument": sys.argv[2], + "targets": sys.argv[4:], + "default_target": sys.argv[4] if number_of_targets == 1 else "", "icons": { doms[d].name if doms[d].klass != "DispVM" else f'@dispvm:{doms[d].name}': diff --git a/qui/tools/qubes_device_agent.py b/qui/tools/qubes_device_agent.py index 37a5a3b2..0ef0c330 100644 --- a/qui/tools/qubes_device_agent.py +++ b/qui/tools/qubes_device_agent.py @@ -56,6 +56,7 @@ def __init__(self, options, domains_info=None): self._override_entries(options) def _override_entries(self, options): + self._entries = {} for name, vm in self._domains_info.items(): if name.startswith("@dispvm:"): vm_name = name[len("@dispvm:"):] @@ -68,6 +69,7 @@ def _override_entries(self, options): icon = self._get_icon(vm.get("icon", None)) display_name = prefix + vm_name + options.get(name, "") + display_name = display_name.strip() self._entries[display_name] = { "api_name": vm_name, "icon": icon, From 1315da634f9f7d202362416df33c01c821c9beec Mon Sep 17 00:00:00 2001 From: Piotr Bartman-Szwarc Date: Wed, 28 Aug 2024 12:17:41 +0200 Subject: [PATCH 12/19] q-dev: drop dependence on qubes core --- qui/tools/attach_confirm.py | 7 ++++--- 1 file changed, 4 insertions(+), 3 deletions(-) diff --git a/qui/tools/attach_confirm.py b/qui/tools/attach_confirm.py index 02db6de4..afe9d76a 100755 --- a/qui/tools/attach_confirm.py +++ b/qui/tools/attach_confirm.py @@ -21,9 +21,10 @@ import sys import asyncio -import qubes +import qubesadmin from qrexec.server import call_socket_service +from qrexec.utils import get_system_info SOCKET_PATH = "/var/run/qubes" @@ -31,8 +32,8 @@ def main(): socket = "device-agent.GUI" - app = qubes.Qubes() - system_info = qubes.api.internal.get_system_info(app) + app = qubesadmin.Qubes() + system_info = get_system_info() doms = app.domains try: From 6ee680e75aed35f7f4c15a3826f59366932abaa2 Mon Sep 17 00:00:00 2001 From: Piotr Bartman-Szwarc Date: Wed, 28 Aug 2024 15:08:19 +0200 Subject: [PATCH 13/19] q-dev: make pylint happy --- qui/tools/attach_confirm.py | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/qui/tools/attach_confirm.py b/qui/tools/attach_confirm.py index afe9d76a..1c16e2d4 100755 --- a/qui/tools/attach_confirm.py +++ b/qui/tools/attach_confirm.py @@ -52,9 +52,9 @@ def main(): "targets": sys.argv[4:], "default_target": sys.argv[4] if number_of_targets == 1 else "", "icons": { - doms[d].name - if doms[d].klass != "DispVM" else f'@dispvm:{doms[d].name}': - doms[d].icon for d in doms.keys() + dom.name + if dom.klass != "DispVM" else f'@dispvm:{dom.name}': + dom.icon for dom in doms.values() }, } From 31a425f1db6a7027ef76668cd4feae3c05026d11 Mon Sep 17 00:00:00 2001 From: Piotr Bartman-Szwarc Date: Tue, 8 Oct 2024 10:18:17 +0200 Subject: [PATCH 14/19] q-dev: remove literals --- qui/tools/attach_confirm.py | 16 ++++++++++------ 1 file changed, 10 insertions(+), 6 deletions(-) diff --git a/qui/tools/attach_confirm.py b/qui/tools/attach_confirm.py index 1c16e2d4..74db6d04 100755 --- a/qui/tools/attach_confirm.py +++ b/qui/tools/attach_confirm.py @@ -27,6 +27,10 @@ from qrexec.utils import get_system_info SOCKET_PATH = "/var/run/qubes" +SOURCE = 1 +ARGUMENT = SOURCE + 1 +DEV_NAME = ARGUMENT + 1 +TARGETS = DEV_NAME + 1 def main(): @@ -43,14 +47,14 @@ def main(): if guivm is None: guivm = "dom0" - number_of_targets = len(sys.argv) - 4 + number_of_targets = len(sys.argv) - TARGETS params = { - "source": sys.argv[1], - "device_name": sys.argv[3], - "argument": sys.argv[2], - "targets": sys.argv[4:], - "default_target": sys.argv[4] if number_of_targets == 1 else "", + "source": sys.argv[SOURCE], + "device_name": sys.argv[DEV_NAME], + "argument": sys.argv[ARGUMENT], + "targets": sys.argv[TARGETS:], + "default_target": sys.argv[TARGETS] if number_of_targets == 1 else "", "icons": { dom.name if dom.klass != "DispVM" else f'@dispvm:{dom.name}': From 0351410cec0ccb614f02fef762f291786392f9ad Mon Sep 17 00:00:00 2001 From: Piotr Bartman-Szwarc Date: Mon, 14 Oct 2024 22:26:06 +0200 Subject: [PATCH 15/19] q-dev: rename attach-confirm -> qubes-device-attach-confirm --- .../{attach_confirm.py => qubes_device_attach_confirm.py} | 0 rpm_spec/qubes-desktop-linux-manager.spec.in | 4 ++-- 2 files changed, 2 insertions(+), 2 deletions(-) rename qui/tools/{attach_confirm.py => qubes_device_attach_confirm.py} (100%) diff --git a/qui/tools/attach_confirm.py b/qui/tools/qubes_device_attach_confirm.py similarity index 100% rename from qui/tools/attach_confirm.py rename to qui/tools/qubes_device_attach_confirm.py diff --git a/rpm_spec/qubes-desktop-linux-manager.spec.in b/rpm_spec/qubes-desktop-linux-manager.spec.in index d671f140..5d9fadbe 100644 --- a/rpm_spec/qubes-desktop-linux-manager.spec.in +++ b/rpm_spec/qubes-desktop-linux-manager.spec.in @@ -92,7 +92,7 @@ fi gtk-update-icon-cache %{_datadir}/icons/Adwaita &>/dev/null || : %files -/usr/bin/attach-confirm +/usr/bin/qubes-device-attach-confirm /usr/bin/qubes-device-agent %defattr(-,root,root,-) @@ -147,7 +147,7 @@ gtk-update-icon-cache %{_datadir}/icons/Adwaita &>/dev/null || : %dir %{python3_sitelib}/qui/tools/__pycache__ %{python3_sitelib}/qui/tools/__pycache__/* %{python3_sitelib}/qui/tools/__init__.py -%{python3_sitelib}/qui/tools/attach_confirm.py +%{python3_sitelib}/qui/tools/qubes_device_attach_confirm.py %{python3_sitelib}/qui/tools/qubes_device_agent.py %dir %{python3_sitelib}/qui/tray/ From 92ec055a94fb744201dba296afd6db36590f9c36 Mon Sep 17 00:00:00 2001 From: Piotr Bartman-Szwarc Date: Mon, 14 Oct 2024 23:00:19 +0200 Subject: [PATCH 16/19] q-dev: add short way to create DeviceAssignment --- qui/devices/backend.py | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/qui/devices/backend.py b/qui/devices/backend.py index 0b622992..e7204418 100644 --- a/qui/devices/backend.py +++ b/qui/devices/backend.py @@ -233,9 +233,9 @@ def attach_to_vm(self, vm: VM): Perform attachment to provided VM. """ try: - assignment = DeviceAssignment(VirtualDevice(Port( - self.backend_domain, self.id_string, self.device_class), - device_id=self._device_id)) + assignment = DeviceAssignment.new(self.backend_domain, + port_id=self.id_string, devclass=self.device_class, + device_id=self._device_id) vm.vm_object.devices[self.device_class].attach(assignment) self.gtk_app.emit_notification( @@ -265,8 +265,8 @@ def detach_from_vm(self, vm: VM): Gio.NotificationPriority.NORMAL, notification_id=self.notification_id) try: - assignment = DeviceAssignment(VirtualDevice(Port( - self.backend_domain, self._ident, self.device_class))) + assignment = DeviceAssignment.new( + self.backend_domain, self._ident, self.device_class) vm.vm_object.devices[self.device_class].detach(assignment) except qubesadmin.exc.QubesException as ex: self.gtk_app.emit_notification( From 1ac753f091090f820869b5c34ad99dba1b8bdc49 Mon Sep 17 00:00:00 2001 From: Piotr Bartman-Szwarc Date: Mon, 14 Oct 2024 23:25:17 +0200 Subject: [PATCH 17/19] q-dev: better variable naming --- qui/devices/device_widget.py | 17 +++++++++-------- 1 file changed, 9 insertions(+), 8 deletions(-) diff --git a/qui/devices/device_widget.py b/qui/devices/device_widget.py index fe034b4d..942ae298 100644 --- a/qui/devices/device_widget.py +++ b/qui/devices/device_widget.py @@ -75,6 +75,7 @@ def __init__(self, app_name, qapp, dispatcher): super().__init__() self.name: str = app_name + # maps: port to connected device (e.g., sys-usb:sda -> block device) self.devices: Dict[str, backend.Device] = {} self.vms: Set[backend.VM] = set() self.dispvm_templates: Set[backend.VM] = set() @@ -130,10 +131,10 @@ def device_list_update(self, vm, _event, **_kwargs): except qubesadmin.exc.QubesException: changed_devices = {} # VM was removed - for dev_name, dev in changed_devices.items(): - if dev_name not in self.devices: + for dev_port, dev in changed_devices.items(): + if dev_port not in self.devices: dev.connection_timestamp = time.monotonic() - self.devices[dev_name] = dev + self.devices[dev_port] = dev self.emit_notification( _("Device available"), _("Device {} is available.").format(dev.description), @@ -141,19 +142,19 @@ def device_list_update(self, vm, _event, **_kwargs): notification_id=dev.notification_id) dev_to_remove = [] - for dev_name, dev in self.devices.items(): + for dev_port, dev in self.devices.items(): if dev.backend_domain != vm: continue - if dev_name not in changed_devices: - dev_to_remove.append((dev_name, dev)) + if dev_port not in changed_devices: + dev_to_remove.append((dev_port, dev)) - for dev_name, dev in dev_to_remove: + for dev_port, dev in dev_to_remove: self.emit_notification( _("Device removed"), _("Device {} has been removed.").format(dev.description), Gio.NotificationPriority.NORMAL, notification_id=dev.notification_id) - del self.devices[dev_name] + del self.devices[dev_port] def initialize_vm_data(self): for vm in self.qapp.domains: From a88b427b3de187d6cc107c3172be9d0913eb62b4 Mon Sep 17 00:00:00 2001 From: Piotr Bartman-Szwarc Date: Fri, 18 Oct 2024 10:08:05 +0200 Subject: [PATCH 18/19] q-dev: cleanup --- qui/devices/backend.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/qui/devices/backend.py b/qui/devices/backend.py index e7204418..b496a73e 100644 --- a/qui/devices/backend.py +++ b/qui/devices/backend.py @@ -24,7 +24,7 @@ import qubesadmin.devices import qubesadmin.vm from qubesadmin.utils import size_to_human -from qubesadmin.device_protocol import Port, VirtualDevice, DeviceAssignment +from qubesadmin.device_protocol import DeviceAssignment import gi gi.require_version('Gtk', '3.0') # isort:skip From b149981b06a82233a205d75703caa5baf9e30e65 Mon Sep 17 00:00:00 2001 From: Piotr Bartman-Szwarc Date: Fri, 18 Oct 2024 21:21:02 +0200 Subject: [PATCH 19/19] q-dev: remove qubes-device-attach-confirm --- qui/tools/qubes_device_attach_confirm.py | 77 -------------------- rpm_spec/qubes-desktop-linux-manager.spec.in | 2 - 2 files changed, 79 deletions(-) delete mode 100755 qui/tools/qubes_device_attach_confirm.py diff --git a/qui/tools/qubes_device_attach_confirm.py b/qui/tools/qubes_device_attach_confirm.py deleted file mode 100755 index 74db6d04..00000000 --- a/qui/tools/qubes_device_attach_confirm.py +++ /dev/null @@ -1,77 +0,0 @@ -#!/usr/bin/python -# -# The Qubes OS Project, https://www.qubes-os.org -# -# Copyright (C) 2024 Piotr Bartman-Szwarc -# -# This program is free software; you can redistribute it and/or -# modify it under the terms of the GNU General Public License -# as published by the Free Software Foundation; either version 2 -# of the License, or (at your option) any later version. -# -# This program is distributed in the hope that it will be useful, -# but WITHOUT ANY WARRANTY; without even the implied warranty of -# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the -# GNU General Public License for more details. -# -# You should have received a copy of the GNU General Public License -# along with this program; if not, write to the Free Software -# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, -# USA. -import sys -import asyncio - -import qubesadmin - -from qrexec.server import call_socket_service -from qrexec.utils import get_system_info - -SOCKET_PATH = "/var/run/qubes" -SOURCE = 1 -ARGUMENT = SOURCE + 1 -DEV_NAME = ARGUMENT + 1 -TARGETS = DEV_NAME + 1 - - -def main(): - socket = "device-agent.GUI" - - app = qubesadmin.Qubes() - system_info = get_system_info() - doms = app.domains - - try: - guivm = system_info["domains"]["dom0"]["guivm"] - except KeyError: - guivm = "dom0" - if guivm is None: - guivm = "dom0" - - number_of_targets = len(sys.argv) - TARGETS - - params = { - "source": sys.argv[SOURCE], - "device_name": sys.argv[DEV_NAME], - "argument": sys.argv[ARGUMENT], - "targets": sys.argv[TARGETS:], - "default_target": sys.argv[TARGETS] if number_of_targets == 1 else "", - "icons": { - dom.name - if dom.klass != "DispVM" else f'@dispvm:{dom.name}': - dom.icon for dom in doms.values() - }, - } - - ask_response = asyncio.run(call_socket_service( - guivm, socket, "dom0", params, SOCKET_PATH - )) - - if ask_response.startswith("allow:"): - print(ask_response[len("allow:"):], end="") - sys.exit(0) - else: - sys.exit(1) - - -if __name__ == "__main__": - main() diff --git a/rpm_spec/qubes-desktop-linux-manager.spec.in b/rpm_spec/qubes-desktop-linux-manager.spec.in index 5d9fadbe..885e4f83 100644 --- a/rpm_spec/qubes-desktop-linux-manager.spec.in +++ b/rpm_spec/qubes-desktop-linux-manager.spec.in @@ -92,7 +92,6 @@ fi gtk-update-icon-cache %{_datadir}/icons/Adwaita &>/dev/null || : %files -/usr/bin/qubes-device-attach-confirm /usr/bin/qubes-device-agent %defattr(-,root,root,-) @@ -147,7 +146,6 @@ gtk-update-icon-cache %{_datadir}/icons/Adwaita &>/dev/null || : %dir %{python3_sitelib}/qui/tools/__pycache__ %{python3_sitelib}/qui/tools/__pycache__/* %{python3_sitelib}/qui/tools/__init__.py -%{python3_sitelib}/qui/tools/qubes_device_attach_confirm.py %{python3_sitelib}/qui/tools/qubes_device_agent.py %dir %{python3_sitelib}/qui/tray/