From c88b32a406e44b272ec959b69e8ad09211ccc0d5 Mon Sep 17 00:00:00 2001 From: Josh Wu Date: Fri, 15 Nov 2024 17:57:13 +0800 Subject: [PATCH] Improve Broadcast Scanning --- apps/auracast.py | 35 ++++--- apps/lea_unicast/app.py | 30 ++++-- bumble/core.py | 3 + bumble/hci.py | 6 +- bumble/profiles/bap.py | 162 ++++++++++++++++++++++++--------- examples/run_unicast_server.py | 8 +- tests/bap_test.py | 52 +++++++++++ 7 files changed, 234 insertions(+), 62 deletions(-) diff --git a/apps/auracast.py b/apps/auracast.py index 96f2a23e..2b645605 100644 --- a/apps/auracast.py +++ b/apps/auracast.py @@ -60,7 +60,7 @@ class BroadcastScanner(pyee.EventEmitter): @dataclasses.dataclass class Broadcast(pyee.EventEmitter): - name: str + name: str | None sync: bumble.device.PeriodicAdvertisingSync rssi: int = 0 public_broadcast_announcement: Optional[ @@ -135,7 +135,8 @@ def print(self) -> None: self.sync.advertiser_address, color(self.sync.state.name, 'green'), ) - print(f' {color("Name", "cyan")}: {self.name}') + if self.name is not None: + print(f' {color("Name", "cyan")}: {self.name}') if self.appearance: print(f' {color("Appearance", "cyan")}: {str(self.appearance)}') print(f' {color("RSSI", "cyan")}: {self.rssi}') @@ -174,7 +175,7 @@ def print(self) -> None: print(color(' Codec ID:', 'yellow')) print( color(' Coding Format: ', 'green'), - subgroup.codec_id.coding_format.name, + subgroup.codec_id.codec_id.name, ) print( color(' Company ID: ', 'green'), @@ -274,13 +275,24 @@ async def stop(self) -> None: await self.device.stop_scanning() def on_advertisement(self, advertisement: bumble.device.Advertisement) -> None: - if ( - broadcast_name := advertisement.data.get( - bumble.core.AdvertisingData.BROADCAST_NAME + if not ( + ads := advertisement.data.get_all( + bumble.core.AdvertisingData.SERVICE_DATA_16_BIT_UUID ) - ) is None: + ) or not ( + any( + ad + for ad in ads + if isinstance(ad, tuple) + and ad[0] == bumble.gatt.GATT_BROADCAST_AUDIO_ANNOUNCEMENT_SERVICE + ) + ): return - assert isinstance(broadcast_name, str) + + broadcast_name = advertisement.data.get( + bumble.core.AdvertisingData.BROADCAST_NAME + ) + assert isinstance(broadcast_name, str) or broadcast_name is None if broadcast := self.broadcasts.get(advertisement.address): broadcast.update(advertisement) @@ -291,7 +303,7 @@ def on_advertisement(self, advertisement: bumble.device.Advertisement) -> None: ) async def on_new_broadcast( - self, name: str, advertisement: bumble.device.Advertisement + self, name: str | None, advertisement: bumble.device.Advertisement ) -> None: periodic_advertising_sync = await self.device.create_periodic_advertising_sync( advertiser_address=advertisement.address, @@ -299,10 +311,7 @@ async def on_new_broadcast( sync_timeout=self.sync_timeout, filter_duplicates=self.filter_duplicates, ) - broadcast = self.Broadcast( - name, - periodic_advertising_sync, - ) + broadcast = self.Broadcast(name, periodic_advertising_sync) broadcast.update(advertisement) self.broadcasts[advertisement.address] = broadcast periodic_advertising_sync.on('loss', lambda: self.on_broadcast_loss(broadcast)) diff --git a/apps/lea_unicast/app.py b/apps/lea_unicast/app.py index 5885dab3..c12d5228 100644 --- a/apps/lea_unicast/app.py +++ b/apps/lea_unicast/app.py @@ -486,7 +486,12 @@ async def run(self) -> None: def on_pdu(pdu: HCI_IsoDataPacket, ase: ascs.AseStateMachine): codec_config = ase.codec_specific_configuration - assert isinstance(codec_config, bap.CodecSpecificConfiguration) + if ( + not isinstance(codec_config, bap.CodecSpecificConfiguration) + or codec_config.frame_duration is None + or codec_config.audio_channel_allocation is None + ): + return pcm = decode( codec_config.frame_duration.us, codec_config.audio_channel_allocation.channel_count, @@ -495,11 +500,17 @@ def on_pdu(pdu: HCI_IsoDataPacket, ase: ascs.AseStateMachine): self.device.abort_on('disconnection', self.ui_server.send_audio(pcm)) def on_ase_state_change(ase: ascs.AseStateMachine) -> None: + codec_config = ase.codec_specific_configuration if ase.state == ascs.AseStateMachine.State.STREAMING: - codec_config = ase.codec_specific_configuration - assert isinstance(codec_config, bap.CodecSpecificConfiguration) - assert ase.cis_link if ase.role == ascs.AudioRole.SOURCE: + if ( + not isinstance(codec_config, bap.CodecSpecificConfiguration) + or ase.cis_link is None + or codec_config.octets_per_codec_frame is None + or codec_config.frame_duration is None + or codec_config.codec_frames_per_sdu is None + ): + return ase.cis_link.abort_on( 'disconnection', lc3_source_task( @@ -514,10 +525,17 @@ def on_ase_state_change(ase: ascs.AseStateMachine) -> None: ), ) else: + if not ase.cis_link: + return ase.cis_link.sink = functools.partial(on_pdu, ase=ase) elif ase.state == ascs.AseStateMachine.State.CODEC_CONFIGURED: - codec_config = ase.codec_specific_configuration - assert isinstance(codec_config, bap.CodecSpecificConfiguration) + if ( + not isinstance(codec_config, bap.CodecSpecificConfiguration) + or codec_config.sampling_frequency is None + or codec_config.frame_duration is None + or codec_config.audio_channel_allocation is None + ): + return if ase.role == ascs.AudioRole.SOURCE: setup_encoders( codec_config.sampling_frequency.hz, diff --git a/bumble/core.py b/bumble/core.py index f6d42dd5..5aec826d 100644 --- a/bumble/core.py +++ b/bumble/core.py @@ -1624,6 +1624,9 @@ def __bytes__(self): [bytes([len(x[1]) + 1, x[0]]) + x[1] for x in self.ad_structures] ) + def to_bytes(self) -> bytes: + return bytes(self) + def to_string(self, separator=', '): return separator.join( [AdvertisingData.ad_data_to_string(x[0], x[1]) for x in self.ad_structures] diff --git a/bumble/hci.py b/bumble/hci.py index 459b2469..da804ba1 100644 --- a/bumble/hci.py +++ b/bumble/hci.py @@ -1482,7 +1482,7 @@ class CodingFormat: vendor_specific_codec_id: int = 0 @classmethod - def parse_from_bytes(cls, data: bytes, offset: int): + def parse_from_bytes(cls, data: bytes, offset: int) -> tuple[int, CodingFormat]: (codec_id, company_id, vendor_specific_codec_id) = struct.unpack_from( ' CodingFormat: + return cls.parse_from_bytes(data, 0)[1] + def to_bytes(self) -> bytes: return struct.pack( ' CodecSpecificConfiguration: offset = 0 - # Allowed default values. - audio_channel_allocation = AudioLocation.NOT_ALLOWED - codec_frames_per_sdu = 1 + sampling_frequency: SamplingFrequency | None = None + frame_duration: FrameDuration | None = None + audio_channel_allocation: AudioLocation | None = None + octets_per_codec_frame: int | None = None + codec_frames_per_sdu: int | None = None + while offset < len(data): length, type = struct.unpack_from('BB', data, offset) offset += 2 @@ -427,8 +430,6 @@ def from_bytes(cls, data: bytes) -> CodecSpecificConfiguration: elif type == CodecSpecificConfiguration.Type.CODEC_FRAMES_PER_SDU: codec_frames_per_sdu = value - # It is expected here that if some fields are missing, an error should be raised. - # pylint: disable=possibly-used-before-assignment,used-before-assignment return CodecSpecificConfiguration( sampling_frequency=sampling_frequency, frame_duration=frame_duration, @@ -438,23 +439,43 @@ def from_bytes(cls, data: bytes) -> CodecSpecificConfiguration: ) def __bytes__(self) -> bytes: - return struct.pack( - ' Self: return cls(int.from_bytes(data[:3], 'little')) + def to_bytes(self) -> bytes: + return self.broadcast_id.to_bytes(3, 'little') + + def __bytes__(self) -> bytes: + return self.to_bytes() + + def get_advertising_data(self) -> bytes: + return core.AdvertisingData( + [ + ( + core.AdvertisingData.SERVICE_DATA_16_BIT_UUID, + ( + gatt.GATT_BROADCAST_AUDIO_ANNOUNCEMENT_SERVICE.to_bytes() + + self.to_bytes() + ), + ) + ] + ).to_bytes() + @dataclasses.dataclass class BasicAudioAnnouncement: @@ -474,26 +514,43 @@ class BIS: index: int codec_specific_configuration: CodecSpecificConfiguration - @dataclasses.dataclass - class CodecInfo: - coding_format: hci.CodecID - company_id: int - vendor_specific_codec_id: int - - @classmethod - def from_bytes(cls, data: bytes) -> Self: - coding_format = hci.CodecID(data[0]) - company_id = int.from_bytes(data[1:3], 'little') - vendor_specific_codec_id = int.from_bytes(data[3:5], 'little') - return cls(coding_format, company_id, vendor_specific_codec_id) + def to_bytes(self) -> bytes: + codec_specific_configuration_bytes = bytes( + self.codec_specific_configuration + ) + return ( + bytes([self.index, len(codec_specific_configuration_bytes)]) + + codec_specific_configuration_bytes + ) + + def __bytes__(self) -> bytes: + return self.to_bytes() @dataclasses.dataclass class Subgroup: - codec_id: BasicAudioAnnouncement.CodecInfo + codec_id: hci.CodingFormat codec_specific_configuration: CodecSpecificConfiguration metadata: le_audio.Metadata bis: List[BasicAudioAnnouncement.BIS] + def to_bytes(self) -> bytes: + metadata_bytes = bytes(self.metadata) + codec_specific_configuration_bytes = bytes( + self.codec_specific_configuration + ) + return ( + bytes([len(self.bis)]) + + self.codec_id.to_bytes() + + bytes([len(codec_specific_configuration_bytes)]) + + codec_specific_configuration_bytes + + bytes([len(metadata_bytes)]) + + metadata_bytes + + b''.join(map(bytes, self.bis)) + ) + + def __bytes__(self) -> bytes: + return self.to_bytes() + presentation_delay: int subgroups: List[BasicAudioAnnouncement.Subgroup] @@ -505,7 +562,7 @@ def from_bytes(cls, data: bytes) -> Self: for _ in range(data[3]): num_bis = data[offset] offset += 1 - codec_id = cls.CodecInfo.from_bytes(data[offset : offset + 5]) + codec_id = hci.CodingFormat.from_bytes(data[offset : offset + 5]) offset += 5 codec_specific_configuration_length = data[offset] offset += 1 @@ -549,3 +606,26 @@ def from_bytes(cls, data: bytes) -> Self: ) return cls(presentation_delay, subgroups) + + def to_bytes(self) -> bytes: + return ( + self.presentation_delay.to_bytes(3, 'little') + + bytes([len(self.subgroups)]) + + b''.join(map(bytes, self.subgroups)) + ) + + def __bytes__(self) -> bytes: + return self.to_bytes() + + def get_advertising_data(self) -> bytes: + return core.AdvertisingData( + [ + ( + core.AdvertisingData.SERVICE_DATA_16_BIT_UUID, + ( + gatt.GATT_BASIC_AUDIO_ANNOUNCEMENT_SERVICE.to_bytes() + + self.to_bytes() + ), + ) + ] + ).to_bytes() diff --git a/examples/run_unicast_server.py b/examples/run_unicast_server.py index 3ff1c965..b475eb14 100644 --- a/examples/run_unicast_server.py +++ b/examples/run_unicast_server.py @@ -161,7 +161,13 @@ def on_ase_state_change( else: file_output = open(f'{datetime.datetime.now().isoformat()}.lc3', 'wb') codec_configuration = ase.codec_specific_configuration - assert isinstance(codec_configuration, CodecSpecificConfiguration) + if ( + not isinstance(codec_configuration, CodecSpecificConfiguration) + or codec_configuration.sampling_frequency is None + or codec_configuration.audio_channel_allocation is None + or codec_configuration.frame_duration is None + ): + return # Write a LC3 header. file_output.write( bytes([0x1C, 0xCC]) # Header. diff --git a/tests/bap_test.py b/tests/bap_test.py index 0b57fcd2..aad635ee 100644 --- a/tests/bap_test.py +++ b/tests/bap_test.py @@ -39,6 +39,8 @@ ) from bumble.profiles.bap import ( AudioLocation, + BasicAudioAnnouncement, + BroadcastAudioAnnouncement, SupportedFrameDuration, SupportedSamplingFrequency, SamplingFrequency, @@ -200,6 +202,56 @@ def test_codec_specific_configuration() -> None: assert CodecSpecificConfiguration.from_bytes(bytes(config)) == config +# ----------------------------------------------------------------------------- +def test_broadcast_audio_announcement() -> None: + broadcast_audio_announcement = BroadcastAudioAnnouncement(123456) + assert ( + BroadcastAudioAnnouncement.from_bytes(bytes(broadcast_audio_announcement)) + == broadcast_audio_announcement + ) + + +# ----------------------------------------------------------------------------- +def test_basic_audio_announcement() -> None: + basic_audio_announcement = BasicAudioAnnouncement( + presentation_delay=40000, + subgroups=[ + BasicAudioAnnouncement.Subgroup( + codec_id=CodingFormat(codec_id=CodecID.LC3), + codec_specific_configuration=CodecSpecificConfiguration( + sampling_frequency=SamplingFrequency.FREQ_48000, + frame_duration=FrameDuration.DURATION_10000_US, + octets_per_codec_frame=100, + ), + metadata=Metadata( + [ + Metadata.Entry(tag=Metadata.Tag.LANGUAGE, data=b'eng'), + Metadata.Entry(tag=Metadata.Tag.PROGRAM_INFO, data=b'Disco'), + ] + ), + bis=[ + BasicAudioAnnouncement.BIS( + index=0, + codec_specific_configuration=CodecSpecificConfiguration( + audio_channel_allocation=AudioLocation.FRONT_LEFT + ), + ), + BasicAudioAnnouncement.BIS( + index=1, + codec_specific_configuration=CodecSpecificConfiguration( + audio_channel_allocation=AudioLocation.FRONT_RIGHT + ), + ), + ], + ) + ], + ) + assert ( + BasicAudioAnnouncement.from_bytes(bytes(basic_audio_announcement)) + == basic_audio_announcement + ) + + # ----------------------------------------------------------------------------- @pytest.mark.asyncio async def test_pacs():