diff --git a/bless/backends/bluezdbus/characteristic.py b/bless/backends/bluezdbus/characteristic.py index 94dcce8..f25d52b 100644 --- a/bless/backends/bluezdbus/characteristic.py +++ b/bless/backends/bluezdbus/characteristic.py @@ -74,7 +74,7 @@ async def init(self, service: "BlessGATTService"): service : BlessGATTService The service to assign the characteristic to """ - flags: List[Flags] = flags_to_dbus(self._properties) + flags: List[Flags] = [transform_flags_with_permissions(f, self._permissions) for f in flags_to_dbus(self._properties)] # Add to our BlueZDBus app bluez_service: "BlessGATTServiceBlueZDBus" = cast( @@ -118,6 +118,29 @@ def uuid(self) -> str: """The uuid of this characteristic""" return self.obj.get("UUID").value +def transform_flags_with_permissions(flag: Flags, permissions: GATTAttributePermissions) -> Flags: + """ + Returns the encrypted variant of a flag if the corresponding permission is set + + Parameters + ---------- + flag : GATTCharacteristicProperties + The numerical enumeration of a single flag + + permissions: GATTAttributePermissions + The permissions for the characteristic + + Returns + ------- + List[Flags] + A Flags enum value for use in BlueZDBus that has been updated to reflect if it should be encrypted + """ + if flag == Flags.READ and GATTAttributePermissions.read_encryption_required in permissions: + return Flags.ENCRYPT_READ + elif flag == Flags.WRITE and GATTAttributePermissions.write_encryption_required in permissions: + return Flags.ENCRYPT_WRITE + + return flag def flags_to_dbus(flags: GATTCharacteristicProperties) -> List[Flags]: """ diff --git a/test/backends/test_server.py b/test/backends/test_server.py index 134b732..392444a 100644 --- a/test/backends/test_server.py +++ b/test/backends/test_server.py @@ -2,6 +2,7 @@ import uuid import pytest import asyncio +import os import aioconsole # type: ignore import numpy as np # type: ignore @@ -26,6 +27,7 @@ ) hardware_only = pytest.mark.skipif("os.environ.get('TEST_HARDWARE') is None") +use_encrypted = os.environ.get('TEST_ENCRYPTED') is not None @hardware_only @@ -83,6 +85,13 @@ async def test_server(self): GATTAttributePermissions.readable | GATTAttributePermissions.writeable ) + + if use_encrypted: + print("\nEncryption has been enabled, ensure that you are bonded") + permissions = ( + GATTAttributePermissions.read_encryption_required | + GATTAttributePermissions.write_encryption_required + ) await server.add_new_characteristic( service_uuid,