diff --git a/btlewrap/__init__.py b/btlewrap/__init__.py index 7e1de19..fad905d 100644 --- a/btlewrap/__init__.py +++ b/btlewrap/__init__.py @@ -17,9 +17,10 @@ from btlewrap.bluepy import BluepyBackend # noqa: E402 # pylint: disable=wrong-import-position from btlewrap.gatttool import GatttoolBackend # noqa: E402 # pylint: disable=wrong-import-position from btlewrap.pygatt import PygattBackend # noqa: E402 # pylint: disable=wrong-import-position +from btlewrap.socket import SocketBackend # noqa: E402 # pylint: disable=wrong-import-position -_ALL_BACKENDS = [BluepyBackend, GatttoolBackend, PygattBackend] +_ALL_BACKENDS = [BluepyBackend, GatttoolBackend, PygattBackend, SocketBackend] def available_backends(): diff --git a/btlewrap/socket.py b/btlewrap/socket.py new file mode 100644 index 0000000..b2c562d --- /dev/null +++ b/btlewrap/socket.py @@ -0,0 +1,210 @@ +"""Backend using L2CAP and HCI sockets, primarily intended for FreeBSD. +""" + +import ctypes +import ctypes.util +import logging +import socket +import struct +import time +from typing import List, Tuple, Callable +from btlewrap.base import AbstractBackend, BluetoothBackendException + + +_LOGGER = logging.getLogger(__name__) +SOL_HCI_RAW = 0x0802 +SOL_HCI_RAW_FILTER = 1 +NG_HCI_EVENT_MASK_LE = 0x2000000000000000 +LE_META_EVENT = 0x3e +EVT_LE_ADVERTISING_REPORT = 0x02 +OGF_LE_CTL = 0x8 +OCF_LE_SET_EVENT_MASK = 0x1 +OCF_LE_SET_SCAN_PARAMETERS = 0xB +OCF_LE_SET_SCAN_ENABLE = 0xC + + +class SockaddrL2cap(ctypes.Structure): + _fields_ = [ + ('l2cap_len', ctypes.c_char), + ('l2cap_family', ctypes.c_char), + ('l2cap_psm', ctypes.c_int16), + ('l2cap_bdaddr', ctypes.c_int8 * 6), + ('l2cap_cid', ctypes.c_int16), + ('l2cap_bdaddr_type', ctypes.c_int8), + ] + + +class SockaddrHci(ctypes.Structure): + _fields_ = [ + ('hci_len', ctypes.c_char), + ('hci_family', ctypes.c_char), + ('hci_node', ctypes.c_char * 32), + ] + + +class HciRawFilter(ctypes.Structure): + _fields_ = [ + ('packet_mask', ctypes.c_uint32), + ('event_mask', ctypes.c_uint64), + ] + + +def hci_connect(libc, adapter: str): + sock = socket.socket(socket.AF_BLUETOOTH, socket.SOCK_RAW, socket.BTPROTO_HCI) + adr = SockaddrHci(ctypes.sizeof(SockaddrHci), socket.AF_BLUETOOTH, + (adapter + 'hci').ljust(32, '\0').encode('utf-8')) + if libc.bind(sock.fileno(), ctypes.pointer(adr), ctypes.sizeof(SockaddrHci)) != 0: + raise BluetoothBackendException('Error {}'.format(ctypes.get_errno())) + if libc.connect(sock.fileno(), ctypes.pointer(adr), ctypes.sizeof(SockaddrHci)) != 0: + raise BluetoothBackendException('Error {}'.format(ctypes.get_errno())) + filter = HciRawFilter(0, NG_HCI_EVENT_MASK_LE) + if libc.setsockopt(sock.fileno(), + SOL_HCI_RAW, SOL_HCI_RAW_FILTER, + ctypes.pointer(filter), ctypes.sizeof(HciRawFilter)) != 0: + raise BluetoothBackendException('Error {}'.format(ctypes.get_errno())) + return sock + + +def hci_send_cmd(sock, gf: int, cf: int, data: bytes): + opcode = (((gf & 0x3f) << 10) | (cf & 0x3ff)) + sock.send(struct.pack(' bytes: + """Read a handle from the device. + + You must be connected to do this. + """ + if self._sock is None: + raise BluetoothBackendException('not connected to backend') + + self._sock.send(struct.pack(' bool: + """Check if the backend is available.""" + if not 'AF_BLUETOOTH' in dir(socket): + return False + if not 'SOCK_SEQPACKET' in dir(socket) or not 'SOCK_RAW' in dir(socket): + return False + if not 'BTPROTO_L2CAP' in dir(socket) or not 'BTPROTO_HCI' in dir(socket): + return False + return True + + @staticmethod + def scan_for_devices(timeout: float, adapter='ubt0') -> List[Tuple[str, str]]: + """Scan for Bluetooth Low Energy devices. + + Note: this needs to run as root!""" + libc = ctypes.CDLL(ctypes.util.find_library('c'), use_errno=True) + hci = hci_connect(libc, adapter) + hci_set_ble_mask(hci) + hci_send_cmd(hci, OGF_LE_CTL, OCF_LE_SET_SCAN_ENABLE, + struct.pack('BB', False, False)) + params = struct.pack( + " 0: + print(timeout) + hci.settimeout(timeout) + data = None + try: + data = hci.recv(255) + except socket.timeout: + continue + timeout -= time.monotonic() - read_time + read_time = time.monotonic() + if data[1] != LE_META_EVENT or data[3] != EVT_LE_ADVERTISING_REPORT: + continue + mac = ':'.join(map(lambda x: '{:02X}'.format(x), reversed(data[7:13]))) + name = None + attrs = data[14:] + while len(attrs) > 2: + length = attrs[0] + if attrs[1] == 9: + name = str(attrs[2:length + 1]) + attrs = attrs[length + 1:] + result[mac] = name + hci_send_cmd(hci, OGF_LE_CTL, OCF_LE_SET_SCAN_ENABLE, + struct.pack('BB', False, False)) + return [(k, v) for k, v in result.items()]