diff --git a/main.py b/main.py old mode 100644 new mode 100755 index 438420d..7d5ef37 --- a/main.py +++ b/main.py @@ -1,10 +1,13 @@ -from bleak import discover +#!/usr/bin/env python3 +from bleak import BleakScanner from asyncio import new_event_loop, set_event_loop, get_event_loop -from time import sleep, time_ns +from time import sleep from binascii import hexlify from json import dumps from sys import argv from datetime import datetime +import sys +import asyncio # Configure update duration (update after n seconds) UPDATE_DURATION = 1 @@ -16,37 +19,48 @@ recent_beacons = [] -def get_best_result(device): - recent_beacons.append({ +def get_best_result(device, adv_data): + try: + from time import time_ns + except ImportError: + from datetime import datetime + + def time_ns(): + now = datetime.now() + return int(now.timestamp() * 1e9) + + current_beacon = { "time": time_ns(), - "device": device - }) + "device": device, + "adv_data": adv_data + } + recent_beacons.append(current_beacon) strongest_beacon = None i = 0 while i < len(recent_beacons): if(time_ns() - recent_beacons[i]["time"] > RECENT_BEACONS_MAX_T_NS): recent_beacons.pop(i) continue - if (strongest_beacon == None or strongest_beacon.rssi < recent_beacons[i]["device"].rssi): - strongest_beacon = recent_beacons[i]["device"] + if (strongest_beacon == None or strongest_beacon["adv_data"].rssi < recent_beacons[i]["adv_data"].rssi): + strongest_beacon = recent_beacons[i] i += 1 - if (strongest_beacon != None and strongest_beacon.address == device.address): - strongest_beacon = device + if (strongest_beacon != None and strongest_beacon["device"].address == device.address): + strongest_beacon = current_beacon - return strongest_beacon + return strongest_beacon["adv_data"] # Getting data with hex format async def get_device(): # Scanning for devices - devices = await discover() - for d in devices: + discovered_devices_and_advertisement_data = await BleakScanner.discover(return_adv=True) + for device, adv_data in discovered_devices_and_advertisement_data.values(): # Checking for AirPods - d = get_best_result(d) - if d.rssi >= MIN_RSSI and AIRPODS_MANUFACTURER in d.metadata['manufacturer_data']: - data_hex = hexlify(bytearray(d.metadata['manufacturer_data'][AIRPODS_MANUFACTURER])) - data_length = len(hexlify(bytearray(d.metadata['manufacturer_data'][AIRPODS_MANUFACTURER]))) + adv_data = get_best_result(device, adv_data) + if adv_data.rssi >= MIN_RSSI and AIRPODS_MANUFACTURER in adv_data.manufacturer_data: + data_hex = hexlify(bytearray(adv_data.manufacturer_data[AIRPODS_MANUFACTURER])) + data_length = len(hexlify(bytearray(adv_data.manufacturer_data[AIRPODS_MANUFACTURER]))) if data_length == AIRPODS_DATA_LENGTH: return data_hex return False @@ -54,11 +68,14 @@ async def get_device(): # Same as get_device() but it's standalone method instead of async def get_data_hex(): - new_loop = new_event_loop() - set_event_loop(new_loop) - loop = get_event_loop() - a = loop.run_until_complete(get_device()) - loop.close() + if sys.version_info < (3, 7): + new_loop = new_event_loop() + set_event_loop(new_loop) + loop = get_event_loop() + a = loop.run_until_complete(get_device()) + loop.close() + else: + a = asyncio.run(get_device()) return a @@ -84,6 +101,8 @@ def get_data(): model = "AirPods1" elif chr(raw[7]) == 'a': model = "AirPodsMax" + elif chr(raw[7]) == '4': + model = "AirPodsPro2" else: model = "unknown" @@ -130,19 +149,22 @@ def is_flipped(raw): def run(): output_file = argv[-1] - while True: - data = get_data() - - if data["status"] == 1: - json_data = dumps(data) - if len(argv) > 1: - f = open(output_file, "a") - f.write(json_data+"\n") - f.close() - else: - print(json_data) - - sleep(UPDATE_DURATION) + try: + while True: + data = get_data() + + if data["status"] == 1: + json_data = dumps(data) + if len(argv) > 1: + f = open(output_file, "a") + f.write(json_data+"\n") + f.close() + else: + print(json_data) + + sleep(UPDATE_DURATION) + except KeyboardInterrupt: + pass if __name__ == '__main__':