Skip to content

Commit

Permalink
q-dev: call attach-confirm socket directly
Browse files Browse the repository at this point in the history
  • Loading branch information
piotrbartman committed Oct 21, 2024
1 parent ed84c57 commit dcd8580
Show file tree
Hide file tree
Showing 2 changed files with 62 additions and 33 deletions.
25 changes: 13 additions & 12 deletions qubesusbproxy/tests.py
Original file line number Diff line number Diff line change
Expand Up @@ -808,6 +808,8 @@ class TestVM(qubes.tests.TestEmitter):
def __init__(self, qdb, running=True, name="test-vm", **kwargs):
super().__init__(**kwargs)
self.name = name
self.klass = "AdminVM" if name == "dom0" else "AppVM"
self.icon = "red"
self.untrusted_qdb = TestQubesDB(qdb)
self.libvirt_domain = mock.Mock()
self.features = mock.Mock()
Expand Down Expand Up @@ -859,6 +861,7 @@ def added_assign_setup(attachment=None):
back_vm.app.domains["sys-usb"] = back_vm
back_vm.app.domains["front-vm"] = front
back_vm.app.domains[0] = dom0
back_vm.app.domains["dom0"] = dom0
front.app = back_vm.app
dom0.app = back_vm.app

Expand Down Expand Up @@ -1001,8 +1004,10 @@ def test_013_on_qdb_change_two_fronts(self):
self.ext, {"1-1": {front: assmnt, back: assmnt}}
)

@unittest.mock.patch("asyncio.create_subprocess_shell")
def test_014_failed_confirmation(self, shell):
# call_socket_service returns coroutine
@unittest.mock.patch(
'qubes.ext.utils.call_socket_service', new_callable=AsyncMock)
def test_014_failed_confirmation(self, socket):
back, front = self.added_assign_setup()

exp_dev = qubesusbproxy.core3ext.USBDevice(back, "1-1")
Expand All @@ -1012,10 +1017,7 @@ def test_014_failed_confirmation(self, shell):
back.devices["usb"]._assigned.append(assmnt)
back.devices["usb"]._exposed.append(exp_dev)

proc = AsyncMock()
shell.return_value = proc
proc.communicate = AsyncMock()
proc.communicate.return_value = (b"nonsense", b"")
socket.return_value = "allow:nonsense"

loop = asyncio.get_event_loop()
self.ext.attach_and_notify = AsyncMock()
Expand All @@ -1026,8 +1028,10 @@ def test_014_failed_confirmation(self, shell):
)
self.ext.attach_and_notify.assert_not_called()

@unittest.mock.patch("asyncio.create_subprocess_shell")
def test_015_successful_confirmation(self, shell):
# call_socket_service returns coroutine
@unittest.mock.patch(
'qubes.ext.utils.call_socket_service', new_callable=AsyncMock)
def test_015_successful_confirmation(self, socket):
back, front = self.added_assign_setup()

exp_dev = qubesusbproxy.core3ext.USBDevice(back, "1-1")
Expand All @@ -1037,10 +1041,7 @@ def test_015_successful_confirmation(self, shell):
back.devices["usb"]._assigned.append(assmnt)
back.devices["usb"]._exposed.append(exp_dev)

proc = AsyncMock()
shell.return_value = proc
proc.communicate = AsyncMock()
proc.communicate.return_value = (b"front-vm", b"")
socket.return_value = "allow:front-vm"

loop = asyncio.get_event_loop()
self.ext.attach_and_notify = AsyncMock()
Expand Down
70 changes: 49 additions & 21 deletions qubesusbproxy/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,10 @@
from qubes import device_protocol
from qubes.device_protocol import VirtualDevice

from qrexec.server import call_socket_service

SOCKET_PATH = "/var/run/qubes"


def device_list_change(
ext: qubes.ext.Extension,
Expand Down Expand Up @@ -158,26 +162,50 @@ def compare_device_cache(vm, devices_cache, current_devices):

async def confirm_device_attachment(device, frontends) -> str:
try:
front_names = [f.name for f in frontends.keys()]
# pylint: disable=consider-using-with
# vm names are safe to just join by spaces
proc = await asyncio.create_subprocess_shell(
" ".join(
[
"qubes-device-attach-confirm",
device.backend_domain.name,
device.port_id,
"'" + device.description + "'",
*front_names,
]
),
stdout=asyncio.subprocess.PIPE,
)
(target_name, _) = await proc.communicate()
target_name = target_name.decode(encoding="ascii")
if target_name in front_names:
return target_name
return ""
return await _do_confirm_device_attachment(device, frontends)
except Exception as exc:
print(exc, file=sys.stderr)
print(str(exc.__class__.__name__) + ":", str(exc), file=sys.stderr)
return ""

async def _do_confirm_device_attachment(device, frontends):
socket = "device-agent.GUI"

app = tuple(frontends.keys())[0].app
doms = app.domains

front_names = [f.name for f in frontends.keys()]

try:
guivm = doms["dom0"].guivm.name
except AttributeError:
guivm = "dom0"

number_of_targets = len(front_names)

params = {
"source": device.backend_domain.name,
"device_name": device.description,
"argument": device.port_id,
"targets": front_names,
"default_target": front_names[0] if number_of_targets == 1 else "",
"icons": {
dom.name
if dom.klass != "DispVM" else f'@dispvm:{dom.name}':
dom.icon for dom in doms.values()
},
}

socked_call = asyncio.create_task(call_socket_service(
guivm, socket, "dom0", params, SOCKET_PATH
))

while not socked_call.done():
await asyncio.sleep(0.1)

ask_response = await socked_call

if ask_response.startswith("allow:"):
chosen = ask_response[len("allow:"):]
if chosen in front_names:
return chosen
return ""

0 comments on commit dcd8580

Please sign in to comment.