Skip to content

Commit

Permalink
q-dev: implementation of self_identity
Browse files Browse the repository at this point in the history
  • Loading branch information
piotrbartman committed Jun 1, 2024
1 parent daac099 commit d261a0f
Showing 1 changed file with 45 additions and 22 deletions.
67 changes: 45 additions & 22 deletions qubesusbproxy/core3ext.py
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,8 @@ def __init__(self, backend_domain, ident):

self._qdb_ident = ident.replace('.', '_')
self._qdb_path = '/qubes-usb-devices/' + self._qdb_ident
self._vendor_id = None
self._product_id = None

@property
def vendor(self) -> str:
Expand Down Expand Up @@ -176,7 +178,9 @@ def _load_interfaces_from_qubesdb(self) \
def _load_desc_from_qubesdb(self) -> Dict[str, str]:
unknown = "unknown"
result = {"vendor": unknown,
"vendor ID": "0000",
"product": unknown,
"product ID": "0000",
"manufacturer": unknown,
"name": unknown,
"serial": unknown}
Expand All @@ -190,29 +194,34 @@ def _load_desc_from_qubesdb(self) -> Dict[str, str]:
if not untrusted_device_desc:
return result
try:
(untrusted_vendor_product, untrusted_manufacturer,
(untrusted_vend_prod_id, untrusted_manufacturer,
untrusted_name, untrusted_serial
) = untrusted_device_desc.split(b' ')
untrusted_vendor, untrusted_product = (
untrusted_vendor_product.split(b':'))
untrusted_vendor_id, untrusted_product_id = (
untrusted_vend_prod_id.split(b':'))
except ValueError:
# desc doesn't contain correctly formatted data,
# but it is not empty. We cannot parse it,
# but we can still put it to the `serial` just to provide
# but we can still put it to the `name` just to provide
# some information to the user.
untrusted_vendor, untrusted_product, untrusted_manufacturer = (
unknown.encode(), unknown.encode(), unknown.encode())
untrusted_vendor_id, untrusted_product_id = ("0000", "0000")
(untrusted_manufacturer, untrusted_serial) = (
unknown.encode() for _ in range(2))
untrusted_name = untrusted_device_desc.replace(b' ', b'_')

# Data successfully loaded, cache these values
self._vendor_id = result["vendor ID"] = self._sanitize(
untrusted_vendor_id)
self._product_id = result["product ID"] = self._sanitize(
untrusted_product_id)
vendor, product = self._get_vendor_and_product_names(
self._sanitize(untrusted_vendor),
self._sanitize(untrusted_product),
)
self._vendor_id, self._product_id)
self._vendor = result["vendor"] = vendor
self._product = result["product"] = product
self._manufacturer = result["manufacturer"] = (
self._sanitize(untrusted_manufacturer))
self._name = result["name"] = (
self._sanitize(untrusted_name))
self._name = result["name"] = (self._sanitize(untrusted_name))
self._name = result["serial"] = (self._sanitize(untrusted_serial))
return result

@staticmethod
Expand All @@ -221,7 +230,7 @@ def _sanitize(
safe_chars: str =
string.ascii_letters + string.digits + string.punctuation + ' '
) -> str:
# b'USB\\x202.0\\x20Camera' -> 'USB 2.0 Camera'
# rb'USB\x202.0\x20Camera' -> 'USB 2.0 Camera'
untrusted_device_desc = untrusted_device_desc.decode(
'unicode_escape', errors='ignore')
return ''.join(
Expand Down Expand Up @@ -254,6 +263,24 @@ def attachment(self):
return None
return connected_to

@property
def self_identity(self) -> str:
"""
Get identification of device not related to port.
"""
if self._vendor_id is None:
vendor_id = self._load_desc_from_qubesdb()["vendor ID"]
else:
vendor_id = self._vendor_id
if self._product_id is None:
product_id = self._load_desc_from_qubesdb()["product ID"]
else:
product_id = self._product_id
interfaces = ''.join(repr(ifc) for ifc in self.interfaces)
serial = self.serial if self.serial != "unknown" else ""
return \
f'{vendor_id}:{product_id}:{serial}:{interfaces}'

@staticmethod
def _get_vendor_and_product_names(
vendor_id: str, product_id: str
Expand Down Expand Up @@ -320,10 +347,6 @@ class QubesUSBException(qubes.exc.QubesException):
pass


class UnrecognizedDevice(QubesUSBException):
pass


def modify_qrexec_policy(service, line, add):
"""
Add/remove *line* to qrexec policy of a *service*.
Expand Down Expand Up @@ -407,7 +430,7 @@ async def _attach_and_notify(self, vm, device, options):
try:
await self.on_device_attach_usb(
vm, 'device-pre-attach:usb', device, options)
except UnrecognizedDevice:
except qubes.devices.UnrecognizedDevice:
return
await vm.fire_event_async(
'device-attach:usb', device=device, options=options)
Expand Down Expand Up @@ -469,7 +492,7 @@ def on_qdb_change(self, vm, event, path):
dev = USBDevice(vm, dev_ident)
asyncio.ensure_future(front_vm.fire_event_async(
'device-attach:usb', device=dev,
options={'identity': dev.full_identity})
options={'identity': dev.self_identity})
)

self.devices_cache[vm.name] = current_devices
Expand Down Expand Up @@ -537,7 +560,7 @@ def on_device_list_attached(self, vm, event, **kwargs):

for dev in self.get_all_devices(vm.app):
if dev.attachment == vm:
yield (dev, {'identity': dev.full_identity})
yield (dev, {'identity': dev.self_identity})

@qubes.ext.handler('device-pre-attach:usb')
async def on_device_attach_usb(self, vm, event, device, options):
Expand All @@ -548,12 +571,12 @@ async def on_device_attach_usb(self, vm, event, device, options):
raise qubes.exc.QubesException(
'USB device attach do not support user options')
identity = options['identity']
if device.full_identity != identity:
if device.self_identity != identity:
print(f"Unrecognized identity, skipping attachment of {device}",
file=sys.stderr)
raise UnrecognizedDevice(
raise qubes.devices.UnrecognizedDevice(
"Device presented identity "
f"{device.full_identity} "
f"{device.self_identity} "
f"does not match expected {identity}"
)

Expand Down

0 comments on commit d261a0f

Please sign in to comment.