From 7dadfa51e1bb577e9fd85349e8963cbebae80399 Mon Sep 17 00:00:00 2001 From: Josh Wu Date: Sat, 16 Nov 2024 00:34:09 +0800 Subject: [PATCH] BIG Sync app --- apps/auracast.py | 133 ++++++++++++++++++++++++++++++++++++++++++----- bumble/device.py | 49 ++++++++++++----- 2 files changed, 157 insertions(+), 25 deletions(-) diff --git a/apps/auracast.py b/apps/auracast.py index 2b645605..508bb6a9 100644 --- a/apps/auracast.py +++ b/apps/auracast.py @@ -62,6 +62,7 @@ class BroadcastScanner(pyee.EventEmitter): class Broadcast(pyee.EventEmitter): name: str | None sync: bumble.device.PeriodicAdvertisingSync + broadcast_id: int rssi: int = 0 public_broadcast_announcement: Optional[ bumble.profiles.pbp.PublicBroadcastAnnouncement @@ -280,11 +281,14 @@ def on_advertisement(self, advertisement: bumble.device.Advertisement) -> None: bumble.core.AdvertisingData.SERVICE_DATA_16_BIT_UUID ) ) or not ( - any( - ad - for ad in ads - if isinstance(ad, tuple) - and ad[0] == bumble.gatt.GATT_BROADCAST_AUDIO_ANNOUNCEMENT_SERVICE + broadcast_audio_annoucement := next( + ( + ad + for ad in ads + if isinstance(ad, tuple) + and ad[0] == bumble.gatt.GATT_BROADCAST_AUDIO_ANNOUNCEMENT_SERVICE + ), + None, ) ): return @@ -293,17 +297,27 @@ def on_advertisement(self, advertisement: bumble.device.Advertisement) -> None: bumble.core.AdvertisingData.BROADCAST_NAME ) assert isinstance(broadcast_name, str) or broadcast_name is None + assert isinstance(broadcast_audio_annoucement[1], bytes) if broadcast := self.broadcasts.get(advertisement.address): broadcast.update(advertisement) return bumble.utils.AsyncRunner.spawn( - self.on_new_broadcast(broadcast_name, advertisement) + self.on_new_broadcast( + broadcast_name, + advertisement, + bumble.profiles.bap.BroadcastAudioAnnouncement.from_bytes( + broadcast_audio_annoucement[1] + ).broadcast_id, + ) ) async def on_new_broadcast( - self, name: str | None, advertisement: bumble.device.Advertisement + self, + name: str | None, + advertisement: bumble.device.Advertisement, + broadcast_id: int, ) -> None: periodic_advertising_sync = await self.device.create_periodic_advertising_sync( advertiser_address=advertisement.address, @@ -311,7 +325,7 @@ async def on_new_broadcast( sync_timeout=self.sync_timeout, filter_duplicates=self.filter_duplicates, ) - broadcast = self.Broadcast(name, periodic_advertising_sync) + broadcast = self.Broadcast(name, periodic_advertising_sync, broadcast_id) broadcast.update(advertisement) self.broadcasts[advertisement.address] = broadcast periodic_advertising_sync.on('loss', lambda: self.on_broadcast_loss(broadcast)) @@ -323,10 +337,11 @@ def on_broadcast_loss(self, broadcast: Broadcast) -> None: self.emit('broadcast_loss', broadcast) -class PrintingBroadcastScanner: +class PrintingBroadcastScanner(pyee.EventEmitter): def __init__( self, device: bumble.device.Device, filter_duplicates: bool, sync_timeout: float ) -> None: + super().__init__() self.scanner = BroadcastScanner(device, filter_duplicates, sync_timeout) self.scanner.on('new_broadcast', self.on_new_broadcast) self.scanner.on('broadcast_loss', self.on_broadcast_loss) @@ -610,6 +625,80 @@ async def run_pair(transport: str, address: str) -> None: print("+++ Paired") +async def run_receive( + transport: str, broadcast_id: int, broadcast_code: str | None, sync_timeout: float +) -> None: + async with create_device(transport) as device: + if not device.supports_le_periodic_advertising: + print(color('Periodic advertising not supported', 'red')) + return + + scanner = BroadcastScanner(device, False, sync_timeout) + scan_result: asyncio.Future[BroadcastScanner.Broadcast] = ( + asyncio.get_running_loop().create_future() + ) + + def on_new_broadcast(broadcast: BroadcastScanner.Broadcast) -> None: + if scan_result.done(): + return + if broadcast.broadcast_id == broadcast_id: + scan_result.set_result(broadcast) + + scanner.on('new_broadcast', on_new_broadcast) + await scanner.start() + print('Start scanning...') + broadcast = await scan_result + print('Advertisement found:') + broadcast.print() + basic_audio_announcement_scanned = asyncio.Event() + + def on_change() -> None: + if ( + broadcast.basic_audio_announcement + and not basic_audio_announcement_scanned.is_set() + ): + basic_audio_announcement_scanned.set() + + broadcast.on('change', on_change) + if not broadcast.basic_audio_announcement: + print('Wait for Basic Audio Announcement...') + await basic_audio_announcement_scanned.wait() + print('Basic Audio Announcement found') + broadcast.print() + print('Stop scanning') + await scanner.stop() + print('Start sync to BIG') + assert broadcast.basic_audio_announcement + big_sync = await device.create_big_sync( + broadcast.sync, + bumble.device.BigSyncParameters( + big_sync_timeout=0x4000, + bis=[ + bis.index + for bis in broadcast.basic_audio_announcement.subgroups[0].bis + ], + ), + ) + for bis_link in big_sync.bis_links: + print(f'Setup ISO for BIS {bis_link.handle}') + await device.send_command( + bumble.hci.HCI_LE_Setup_ISO_Data_Path_Command( + connection_handle=bis_link.handle, + data_path_direction=bumble.hci.HCI_LE_Setup_ISO_Data_Path_Command.Direction.CONTROLLER_TO_HOST, + data_path_id=0, + codec_id=bumble.hci.CodingFormat( + codec_id=bumble.hci.CodecID.TRANSPARENT + ), + controller_delay=0, + codec_configuration=b'', + ), + check_result=True, + ) + bis_link.sink = print + + await asyncio.Event().wait() + + def run_async(async_command: Coroutine) -> None: try: asyncio.run(async_command) @@ -631,9 +720,7 @@ def run_async(async_command: Coroutine) -> None: # ----------------------------------------------------------------------------- @click.group() @click.pass_context -def auracast( - ctx, -): +def auracast(ctx): ctx.ensure_object(dict) @@ -691,6 +778,28 @@ def pair(ctx, transport, address): run_async(run_pair(transport, address)) +@auracast.command('receive') +@click.argument('transport') +@click.argument('broadcast_id', type=int) +@click.option( + '--broadcast-code', + metavar='BROADCAST_CODE', + type=str, + help='Boradcast encryption code in hex format', +) +@click.option( + '--sync-timeout', + metavar='SYNC_TIMEOUT', + type=float, + default=AURACAST_DEFAULT_SYNC_TIMEOUT, + help='Sync timeout (in seconds)', +) +@click.pass_context +def receive(ctx, transport, broadcast_id, broadcast_code, sync_timeout): + """Receive a broadcast source""" + run_async(run_receive(transport, broadcast_id, broadcast_code, sync_timeout)) + + def main(): logging.basicConfig(level=os.environ.get('BUMBLE_LOGLEVEL', 'INFO').upper()) auracast() diff --git a/bumble/device.py b/bumble/device.py index e8bc9788..71433c27 100644 --- a/bumble/device.py +++ b/bumble/device.py @@ -1005,8 +1005,7 @@ class BigParameters: phy: hci.PhyBit = hci.PhyBit.LE_2M packing: int = 0 framing: int = 0 - encryption: int = 0 - broadcast_code: bytes = bytes(16) + broadcast_code: bytes | None = None # ----------------------------------------------------------------------------- @@ -1037,7 +1036,7 @@ class Event(str, Enum): irc: int = 0 max_pdu: int = 0 iso_interval: int = 0 - bis_handles: Iterable[int] = () + bis_links: Iterable[BisLink] = () def __post_init__(self) -> None: super().__init__() @@ -1066,11 +1065,10 @@ async def terminate( # ----------------------------------------------------------------------------- @dataclass class BigSyncParameters: - encryption: int - broadcast_code: int - mse: int big_sync_timeout: int bis: list[int] + mse: int = 0 + broadcast_code: bytes | None = None # ----------------------------------------------------------------------------- @@ -1099,7 +1097,7 @@ class Event(str, Enum): irc: int = 0 max_pdu: int = 0 iso_interval: int = 0 - bis_handles: Iterable[int] = () + bis_links: Iterable[BisLink] = () def __post_init__(self) -> None: super().__init__() @@ -1353,6 +1351,17 @@ async def disconnect( await self.device.disconnect(self, reason) +# ----------------------------------------------------------------------------- +@dataclass +class BisLink: + handle: int + big: Big | BigSync + sink: Callable[[hci.HCI_IsoDataPacket], Any] | None = None + + def __post_init__(self) -> None: + self.device = self.big.device + + # ----------------------------------------------------------------------------- class Connection(CompositeEventEmitter): device: Device @@ -1838,6 +1847,7 @@ class Device(CompositeEventEmitter): sco_links: Dict[int, ScoLink] cis_links: Dict[int, CisLink] bigs = dict[int, Big]() + bis_links = dict[int, BisLink]() big_syncs = dict[int, BigSync]() _pending_cis: Dict[int, Tuple[int, int]] @@ -4271,8 +4281,8 @@ async def create_big( phy=parameters.phy, packing=parameters.packing, framing=parameters.framing, - encryption=parameters.encryption, - broadcast_code=parameters.broadcast_code, + encryption=1 if parameters.broadcast_code else 0, + broadcast_code=parameters.broadcast_code or bytes(16), ), check_result=True, ) @@ -4310,10 +4320,11 @@ async def create_big_sync( hci.HCI_LE_BIG_Create_Sync_Command( big_handle=big_handle, sync_handle=pa_sync_handle, - encryption=parameters.encryption, - broadcast_code=parameters.broadcast_code, + encryption=1 if parameters.broadcast_code else 0, + broadcast_code=parameters.broadcast_code or bytes(16), mse=parameters.mse, big_sync_timeout=parameters.big_sync_timeout, + bis=parameters.bis, ), check_result=True, ) @@ -4485,7 +4496,7 @@ def on_big_establishment( big.emit(Big.Event.ESTABLISHMENT_FAILURE, status) return - big.bis_handles = bis_handles[:] + big.bis_links = [BisLink(handle=handle, big=big) for handle in bis_handles] big.big_sync_delay = big_sync_delay big.transport_latency_big = transport_latency_big big.phy = phy @@ -4497,6 +4508,8 @@ def on_big_establishment( big.iso_interval = iso_interval big.state = Big.State.ACTIVE + for bis_link in big.bis_links: + self.bis_links[bis_link.handle] = bis_link big.emit(Big.Event.ESTABLISHMENT) @host_event_handler @@ -4506,6 +4519,8 @@ def on_big_termination(self, reason: int, big_handle: int) -> None: return big.state = Big.State.TERMINTED + for bis_link in big.bis_links: + self.bis_links.pop(bis_link.handle, None) big.emit(Big.Event.TERMINATION, reason) @host_event_handler @@ -4540,9 +4555,13 @@ def on_big_sync_establishment( big_sync.irc = irc big_sync.max_pdu = max_pdu big_sync.iso_interval = iso_interval - big_sync.bis_handles = bis_handles + big_sync.bis_links = [ + BisLink(handle=handle, big=big_sync) for handle in bis_handles + ] big_sync.state = BigSync.State.ACTIVE + for bis_link in big_sync.bis_links: + self.bis_links[bis_link.handle] = bis_link big_sync.emit(BigSync.Event.ESTABLISHMENT) @host_event_handler @@ -4551,6 +4570,8 @@ def on_big_sync_lost(self, big_handle: int, reason: int) -> None: logger.warning('BIG %d not found', big_handle) return + for bis_link in big_sync.bis_links: + self.bis_links.pop(bis_link.handle, None) big_sync.state = BigSync.State.TERMINTED big_sync.emit(BigSync.Event.TERMINATION, reason) @@ -5200,6 +5221,8 @@ def on_cis_establishment_failure(self, cis_handle: int, status: int) -> None: def on_iso_packet(self, handle: int, packet: hci.HCI_IsoDataPacket) -> None: if (cis_link := self.cis_links.get(handle)) and cis_link.sink: cis_link.sink(packet) + elif (bis_link := self.bis_links.get(handle)) and bis_link.sink: + bis_link.sink(packet) @host_event_handler @with_connection_from_handle