diff --git a/apps/rfcomm_bridge.py b/apps/rfcomm_bridge.py index c8d30524..f25fc0fa 100644 --- a/apps/rfcomm_bridge.py +++ b/apps/rfcomm_bridge.py @@ -18,17 +18,16 @@ import asyncio import logging import os +import time from typing import Optional import click from bumble.colors import color -from bumble.device import Device +from bumble.device import Device, Connection from bumble import core from bumble import hci -from bumble import l2cap from bumble import rfcomm -from bumble import sdp from bumble import transport from bumble import utils @@ -40,43 +39,35 @@ DEFAULT_MTU = 4096 DEFAULT_TCP_PORT = 9544 +TRACE_MAX_SIZE = 48 + # ----------------------------------------------------------------------------- -def make_sdp_records(channel: int, uuid: str): - return { - 0x00010001: [ - sdp.ServiceAttribute( - sdp.SDP_SERVICE_RECORD_HANDLE_ATTRIBUTE_ID, - sdp.DataElement.unsigned_integer_32(0x00010001), - ), - sdp.ServiceAttribute( - sdp.SDP_BROWSE_GROUP_LIST_ATTRIBUTE_ID, - sdp.DataElement.sequence( - [sdp.DataElement.uuid(sdp.SDP_PUBLIC_BROWSE_ROOT)] - ), - ), - sdp.ServiceAttribute( - sdp.SDP_SERVICE_CLASS_ID_LIST_ATTRIBUTE_ID, - sdp.DataElement.sequence([sdp.DataElement.uuid(core.UUID(uuid))]), - ), - sdp.ServiceAttribute( - sdp.SDP_PROTOCOL_DESCRIPTOR_LIST_ATTRIBUTE_ID, - sdp.DataElement.sequence( - [ - sdp.DataElement.sequence( - [sdp.DataElement.uuid(core.BT_L2CAP_PROTOCOL_ID)] - ), - sdp.DataElement.sequence( - [ - sdp.DataElement.uuid(core.BT_RFCOMM_PROTOCOL_ID), - sdp.DataElement.unsigned_integer_8(channel), - ] - ), - ] - ), - ), - ] - } +class Tracer: + """ + Trace data buffers transmitted from one endpoint to another, with stats. + """ + + def __init__(self, channel_name: str) -> None: + self.channel_name = channel_name + self.last_ts: float = 0.0 + + def trace_data(self, data: bytes) -> None: + now = time.time() + elapsed_s = now - self.last_ts if self.last_ts else 0 + elapsed_ms = int(elapsed_s * 1000) + instant_throughput_kbps = ((len(data) / elapsed_s) / 1000) if elapsed_s else 0.0 + + hex_str = data[:TRACE_MAX_SIZE].hex() + ( + "..." if len(data) > TRACE_MAX_SIZE else "" + ) + print( + f"[{self.channel_name}] {len(data):4} bytes " + f"(+{elapsed_ms:4}ms, {instant_throughput_kbps: 7.2f}kB/s) " + f" {hex_str}" + ) + + self.last_ts = now # ----------------------------------------------------------------------------- @@ -93,13 +84,24 @@ class ServerBridge: READ_CHUNK_SIZE = 4096 - def __init__(self, channel: int, uuid: str, tcp_host: str, tcp_port: int): + def __init__( + self, channel: int, uuid: str, trace: bool, tcp_host: str, tcp_port: int + ) -> None: self.device: Optional[Device] = None self.channel = channel self.uuid = uuid self.tcp_host = tcp_host self.tcp_port = tcp_port self.rfcomm_channel: Optional[rfcomm.DLC] = None + self.tcp_tracer: Optional[Tracer] + self.rfcomm_tracer: Optional[Tracer] + + if trace: + self.tcp_tracer = Tracer(color("RFCOMM->TCP", "cyan")) + self.rfcomm_tracer = Tracer(color("TCP->RFCOMM", "magenta")) + else: + self.rfcomm_tracer = None + self.tcp_tracer = None async def start(self, device: Device) -> None: self.device = device @@ -111,7 +113,9 @@ async def start(self, device: Device) -> None: self.channel = rfcomm_server.listen(self.on_rfcomm_channel, self.channel) # Setup the SDP to advertise this channel - self.device.sdp_service_records = make_sdp_records(self.channel, self.uuid) + self.device.sdp_service_records = rfcomm.make_service_sdp_records( + 0x00010001, self.channel, core.UUID(self.uuid) + ) # We're ready for a connection self.device.on("connection", self.on_connection) @@ -134,7 +138,7 @@ async def set_available(self, available: bool): await self.device.set_discoverable(available) def on_connection(self, connection): - print(color(f"@@@ Bluetooth connection: {connection}", "green")) + print(color(f"@@@ Bluetooth connection: {connection}", "blue")) connection.on("disconnection", self.on_disconnection) # Don't accept new connections until we're disconnected @@ -142,7 +146,7 @@ def on_connection(self, connection): def on_disconnection(self, reason: int): print( - color("@@@ Bluetooth disconnection:", "red"), + color("@@@ Bluetooth disconnection:", "blue"), hci.HCI_Constant.error_name(reason), ) @@ -157,7 +161,7 @@ async def on_rfcomm_channel(self, rfcomm_channel): # Connect to the TCP server print( color( - f"### Connecting to TCP {self.tcp_host}:{self.tcp_port}...", + f"### Connecting to TCP {self.tcp_host}:{self.tcp_port}", "yellow", ) ) @@ -173,7 +177,13 @@ def on_rfcomm_channel_closed(): print(color("*** RFCOMM channel closed", "cyan")) writer.close() - rfcomm_channel.sink = writer.write + def write_rfcomm_data(data): + if self.rfcomm_tracer: + self.rfcomm_tracer.trace_data(data) + + writer.write(data) + + rfcomm_channel.sink = write_rfcomm_data rfcomm_channel.on("close", on_rfcomm_channel_closed) # Pipe data from TCP to RFCOMM @@ -187,6 +197,9 @@ def on_rfcomm_channel_closed(): await rfcomm_channel.disconnect() return + if self.tcp_tracer: + self.tcp_tracer.trace_data(data) + rfcomm_channel.write(data) await rfcomm_channel.drain() except Exception as error: @@ -212,80 +225,83 @@ class ClientBridge: READ_CHUNK_SIZE = 4096 - def __init__(self, channel: int, uuid: str, address: str, tcp_host, tcp_port): + def __init__( + self, + channel: int, + uuid: str, + trace: bool, + address: str, + tcp_host: str, + tcp_port: int, + encrypt: bool, + ): self.channel = channel self.uuid = uuid + self.trace = trace self.address = address self.tcp_host = tcp_host self.tcp_port = tcp_port + self.encrypt = encrypt + self.device: Optional[Device] = None + self.connection: Optional[Connection] = None + self.rfcomm_client: Optional[rfcomm.Client] + self.rfcomm_mux: Optional[rfcomm.Multiplexer] + self.tcp_connected: bool = False + + self.tcp_tracer: Optional[Tracer] + self.rfcomm_tracer: Optional[Tracer] + + if trace: + self.tcp_tracer = Tracer(color("RFCOMM->TCP", "cyan")) + self.rfcomm_tracer = Tracer(color("TCP->RFCOMM", "magenta")) + else: + self.rfcomm_tracer = None + self.tcp_tracer = None + + async def connect(self) -> None: + if self.connection: + return - async def start(self, device): - print(color(f"### Connecting to {self.address}...", "yellow")) - connection = await device.connect( + print(color(f"@@@ Connecting to Bluetooth {self.address}", "blue")) + assert self.device + self.connection = await self.device.connect( self.address, transport=core.BT_BR_EDR_TRANSPORT ) - print(color("### Connected", "green")) + print(color(f"@@@ Bluetooth connection: {self.connection}", "blue")) + + if self.encrypt: + print(color("@@@ Encrypting Bluetooth connection", "blue")) + await self.connection.encrypt() + + self.rfcomm_client = rfcomm.Client(self.connection) + self.rfcomm_mux = await self.rfcomm_client.start() def on_disconnection(reason): print( color("@@@ Bluetooth disconnection:", "red"), hci.HCI_Constant.error_name(reason), ) + self.connection = None - connection.on("disconnection", on_disconnection) + self.connection.on("disconnection", on_disconnection) - # Resolve the channel number from the UUID if needed - if self.channel == 0: - self.channel = await rfcomm.find_rfcomm_channel_with_uuid( - connection, self.uuid - ) - print(color(f"@@@ Found RFCOMM channel {self.channel}", "cyan")) + async def start(self, device: Device) -> None: + self.device = device # Called when a TCP connection is established async def on_tcp_connection(reader, writer): - peer_name = writer.get_extra_info("peer_name") - print(color(f"<<< TCP connection from {peer_name}", "magenta")) - - # Connect a new RFCOMM channel - print(color(f'### Opening session for channel {self.channel}...', 'yellow')) - rfcomm_client = rfcomm.Client(connection) - rfcomm_mux = await rfcomm_client.start() - try: - rfcomm_channel = await rfcomm_mux.open_dlc(self.channel) - print(color(f'### RFCOMM channel open: {rfcomm_channel}', "yellow")) - except core.ConnectionError as error: - print(color(f'!!! RFCOMM open failed: {error}', 'red')) - await rfcomm_mux.disconnect() - return - - # Pipe data from RFCOMM to TCP - def on_rfcomm_channel_closed(): - print(color("*** RFCOMM channel closed", "cyan")) + print(color("<<< TCP connection", "magenta")) + if self.tcp_connected: + print( + color("!!! TCP connection already active, rejecting new one", "red") + ) writer.close() + return + self.tcp_connected = True - rfcomm_channel.on("close", on_rfcomm_channel_closed) - rfcomm_channel.sink = writer.write - - # Pipe data from TCP to RFCOMM - while True: - try: - data = await reader.read(self.READ_CHUNK_SIZE) - - if len(data) == 0: - print(color("### TCP end of stream", "yellow")) - if rfcomm_channel.state == rfcomm.DLC.State.CONNECTED: - await rfcomm_channel.disconnect() - return - - rfcomm_channel.write(data) - await rfcomm_channel.drain() - except Exception as error: - print(f"!!! Exception: {error}") - break - + await self.pipe(reader, writer) writer.close() await writer.wait_closed() - print(color("~~~ Bye bye", "magenta")) await asyncio.start_server( on_tcp_connection, @@ -298,6 +314,71 @@ def on_rfcomm_channel_closed(): ) ) + async def pipe( + self, reader: asyncio.StreamReader, writer: asyncio.StreamWriter + ) -> None: + # Resolve the channel number from the UUID if needed + if self.channel == 0: + await self.connect() + assert self.connection + channel = await rfcomm.find_rfcomm_channel_with_uuid( + self.connection, self.uuid + ) + if channel: + print(color(f"### Found RFCOMM channel {channel}", "yellow")) + else: + print(color(f"!!! RFCOMM channel with UUID {self.uuid} not found")) + return + else: + channel = self.channel + + # Connect a new RFCOMM channel + await self.connect() + assert self.rfcomm_mux + print(color(f'*** Opening RFCOMM channel {channel}', 'green')) + try: + rfcomm_channel = await self.rfcomm_mux.open_dlc(channel) + print(color(f'*** RFCOMM channel open: {rfcomm_channel}', "green")) + except Exception as error: + print(color(f'!!! RFCOMM open failed: {error}', 'red')) + return + + # Pipe data from RFCOMM to TCP + def on_rfcomm_channel_closed(): + print(color("*** RFCOMM channel closed", "green")) + + def write_rfcomm_data(data): + if self.trace: + self.rfcomm_tracer.trace_data(data) + + writer.write(data) + + rfcomm_channel.on("close", on_rfcomm_channel_closed) + rfcomm_channel.sink = write_rfcomm_data + + # Pipe data from TCP to RFCOMM + while True: + try: + data = await reader.read(self.READ_CHUNK_SIZE) + + if len(data) == 0: + print(color("### TCP end of stream", "yellow")) + if rfcomm_channel.state == rfcomm.DLC.State.CONNECTED: + await rfcomm_channel.disconnect() + self.tcp_connected = False + return + + if self.tcp_tracer: + self.tcp_tracer.trace_data(data) + + rfcomm_channel.write(data) + await rfcomm_channel.drain() + except Exception as error: + print(f"!!! Exception: {error}") + break + + print(color("~~~ Bye bye", "magenta")) + # ----------------------------------------------------------------------------- async def run(device_config, hci_transport, bridge): @@ -313,10 +394,15 @@ async def run(device_config, hci_transport, bridge): # Let's go await device.power_on() - await bridge.start(device) + try: + await bridge.start(device) - # Wait until the transport terminates - await hci_source.wait_for_termination() + # Wait until the transport terminates + await hci_source.wait_for_termination() + except core.ConnectionError as error: + print(color(f"!!! Bluetooth connection failed: {error}", "red")) + except Exception as error: + print(f"Exception while running bridge: {error}") # ----------------------------------------------------------------------------- @@ -324,18 +410,21 @@ async def run(device_config, hci_transport, bridge): @click.pass_context @click.option("--device-config", help="Device configuration file", required=True) @click.option("--hci-transport", help="HCI transport", required=True) +@click.option("--trace", is_flag=True, help="Trace bridged data to stdout") @click.option("--channel", help="RFCOMM channel number", type=int, default=0) @click.option("--uuid", help="UUID for the RFCOMM channel", default=DEFAULT_RFCOMM_UUID) def cli( context, device_config, hci_transport, + trace, channel, uuid, ): context.ensure_object(dict) context.obj["device_config"] = device_config context.obj["hci_transport"] = hci_transport + context.obj["trace"] = trace context.obj["channel"] = channel context.obj["uuid"] = uuid @@ -349,6 +438,7 @@ def server(context, tcp_host, tcp_port): bridge = ServerBridge( context.obj["channel"], context.obj["uuid"], + context.obj["trace"], tcp_host, tcp_port, ) @@ -361,13 +451,16 @@ def server(context, tcp_host, tcp_port): @click.argument("bluetooth-address") @click.option("--tcp-host", help="TCP host", default="_") @click.option("--tcp-port", help="TCP port", default=DEFAULT_TCP_PORT) -def client(context, bluetooth_address, tcp_host, tcp_port): +@click.option("--encrypt", is_flag=True, help="Encrypt the connection") +def client(context, bluetooth_address, tcp_host, tcp_port, encrypt): bridge = ClientBridge( context.obj["channel"], context.obj["uuid"], + context.obj["trace"], bluetooth_address, tcp_host, tcp_port, + encrypt, ) asyncio.run(run(context.obj["device_config"], context.obj["hci_transport"], bridge)) diff --git a/bumble/rfcomm.py b/bumble/rfcomm.py index d516b5ae..1467b05a 100644 --- a/bumble/rfcomm.py +++ b/bumble/rfcomm.py @@ -938,6 +938,9 @@ async def open_dlc( def on_dlc_open_complete(self, dlc: DLC) -> None: logger.debug(f'DLC [{dlc.dlci}] open complete') + + self.change_state(Multiplexer.State.CONNECTED) + if self.open_result: self.open_result.set_result(dlc) self.open_result = None