Skip to content

Commit

Permalink
Implementation of ble-scan method with returning discovered devices (#18
Browse files Browse the repository at this point in the history
)

* Implemented ble_scan with peripheral list return.
  • Loading branch information
nirav1911 authored May 18, 2021
1 parent b2ff41f commit f6e104d
Showing 1 changed file with 75 additions and 4 deletions.
79 changes: 75 additions & 4 deletions src/controller/python/chip-device-ctrl.py
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,8 @@
from chip.ChipBleUtility import FAKE_CONN_OBJ_VALUE
from chip.setup_payload import SetupPayload
from xmlrpc.server import SimpleXMLRPCServer
from enum import Enum
from typing import Any, Dict,Optional

# Extend sys.path with one or more directories, relative to the location of the
# running script, in which the chip package might be found . This makes it
Expand Down Expand Up @@ -69,6 +71,16 @@
elif sys.platform.startswith('linux'):
from chip.ChipBluezMgr import BluezManager as BleManager


class StatusCodeEnum(Enum):
SUCCESS = 0
FAILED = 1

class RPCResponseKeyEnum(Enum):
STATUS = "status"
RESULT = "result"
ERROR = "error"

# The exceptions for CHIP Device Controller CLI


Expand Down Expand Up @@ -617,20 +629,68 @@ def emptyline(self):
device_manager = DeviceMgrCmd(rendezvousAddr=None,
controllerNodeId=0, bluetoothAdapter=0)


# CHIP commands needed by the Harness Tool
def echo_alive(message):
print(message)
return message

def ble_scan():
device_manager.do_blescan("")
#TODO: Return a list of available devices
return "Scan started"
def ble_scan() -> Dict[Any, Any]:
try:
__check_supported_os()
device_manager.do_blescan("")

return __get_response_dict(status = StatusCodeEnum.SUCCESS, result = __get_peripheral_list())
except Exception as e:
return __get_response_dict(status = StatusCodeEnum.FAILED, error = str(e))

def __get_peripheral_list() -> Dict[Any, Any]:
device_list = []
for device in device_manager.bleMgr.peripheral_list:
device_detail = {}
devIdInfo = device_manager.bleMgr.get_peripheral_devIdInfo(device)
if devIdInfo != None:
device_detail['name'] = str(device.Name)
device_detail['id'] = str(device.device_id)
device_detail['rssi'] = str(device.RSSI)
device_detail['address'] = str(device.Address)
device_detail['pairing_state'] = devIdInfo.pairingState
device_detail['discriminator'] = devIdInfo.discriminator
device_detail['vendor_id'] = devIdInfo.vendorId
device_detail['product_id'] = devIdInfo.productId
if device.ServiceData:
for advuuid in device.ServiceData:
device_detail['adv_uuid'] = str(advuuid)
device_list.append(device_detail)
return device_list

def ble_connect(discriminator: int, pin_code: int, node_id: int) -> Dict[str, any]:
try:
__check_supported_os()
device_manager.devCtrl.ConnectBLE(discriminator, pin_code, node_id)
return __get_response_dict(status = StatusCodeEnum.SUCCESS)
except exceptions.ChipStackException as ex:
return __get_response_dict(status = StatusCodeEnum.FAILED, error = str(ex))
except Exception as e:
return __get_response_dict(status = StatusCodeEnum.FAILED, error = str(e))

def ip_connect(ip_address: string, pin_code: int, node_id: int) -> Dict[str, any]:
try:
__check_supported_os()
device_manager.devCtrl.ConnectIP(ip_address.encode("utf-8"), pin_code, node_id)
return __get_response_dict(status = StatusCodeEnum.SUCCESS)
except exceptions.ChipStackException as ex:
return __get_response_dict(status = StatusCodeEnum.FAILED, error = str(ex))
except Exception as e:
return __get_response_dict(status = StatusCodeEnum.FAILED, error = str(e))

def start_rpc_server():

with SimpleXMLRPCServer(("0.0.0.0", 5000)) as server:
server.register_function(echo_alive)
server.register_function(ble_scan)
server.register_function(ble_connect)
server.register_function(ip_connect)
server.register_multicall_functions()
print('Serving XML-RPC on localhost port 5000')
try:
Expand All @@ -639,6 +699,17 @@ def start_rpc_server():
print("\nKeyboard interrupt received, exiting.")
sys.exit(0)

def __get_response_dict(status: StatusCodeEnum, result: Optional[Dict[Any, Any]] = None, error:Optional[str] = None) -> Dict [str, Any]:
return { RPCResponseKeyEnum.STATUS.value : status.value, RPCResponseKeyEnum.RESULT.value : result, RPCResponseKeyEnum.ERROR.value : error }

def __check_supported_os()-> bool:
if platform.system().lower() == 'darwin':
raise Exception(platform.system() + " not supported")
elif sys.platform.lower().startswith('linux'):
return True

raise Exception("OS Not Supported")

######--------------------------------------------------######

def main():
Expand Down

0 comments on commit f6e104d

Please sign in to comment.