From 4f1f19652e7e5f3604121af52802ab4f7e66b78d Mon Sep 17 00:00:00 2001 From: Andrei Litvin Date: Tue, 8 Oct 2024 16:23:01 -0400 Subject: [PATCH] Revert file change that I did not mean to change --- .../linux-cirque/helper/CHIPTestBase.py | 278 +++++++----------- 1 file changed, 101 insertions(+), 177 deletions(-) diff --git a/src/test_driver/linux-cirque/helper/CHIPTestBase.py b/src/test_driver/linux-cirque/helper/CHIPTestBase.py index 74c69bc441f969..ea45a62221e7e5 100644 --- a/src/test_driver/linux-cirque/helper/CHIPTestBase.py +++ b/src/test_driver/linux-cirque/helper/CHIPTestBase.py @@ -35,13 +35,13 @@ class TestResult(IntEnum): SYSTEM_FAILURE = 2 -""" +''' CHIPVirtualHome is a base class for single home tests child classes should implement: - setup() - test_routine() - tear_down() -""" +''' class CHIPVirtualHome: @@ -95,31 +95,31 @@ def query_api(self, end_point, args=[], binary=False): def execute_device_cmd(self, device_id, cmd, stream=False): self.logger.info( - "device: {} exec: {}".format(self.get_device_pretty_id(device_id), cmd) - ) - ret = requests.get( - self._build_request_url("device_cmd", [self.home_id, device_id, cmd]), - params={"stream": stream}, - stream=stream, - ) + "device: {} exec: {}".format(self.get_device_pretty_id(device_id), cmd)) + ret = requests.get(self._build_request_url('device_cmd', [self.home_id, device_id, cmd]), + params={'stream': stream}, + stream=stream) if stream: return ret ret_struct = ret.json() - command_ret_code = ret_struct.get("return_code", None) + command_ret_code = ret_struct.get('return_code', None) if command_ret_code is None: # could be 0 self.logger.error("cannot get command return code") raise Exception("cannot get command return code") self.logger.info( - "command return code: {}".format(ret_struct.get("return_code", "Unknown")) + "command return code: {}".format( + ret_struct.get('return_code', 'Unknown')) ) - command_output = ret_struct.get("output", None) + command_output = ret_struct.get('output', None) if command_output is None: # could be empty string self.logger.error("cannot get command output") raise Exception("cannot get command output") - self.logger.info("command output: \n{}".format(ret_struct.get("output", ""))) + self.logger.info( + "command output: \n{}".format(ret_struct.get('output', '')) + ) return ret_struct def sequenceMatch(self, string, patterns): @@ -128,7 +128,7 @@ def sequenceMatch(self, string, patterns): self.logger.info('Finding string: "{}"'.format(s)) this_find = string.find(s, last_find) if this_find < 0: - self.logger.info("Not found") + self.logger.info('Not found') return False self.logger.info("Found at index={}".format(this_find)) last_find = this_find + len(s) @@ -142,66 +142,49 @@ def reset_thread_devices(self, devices: Union[List[str], str]): devices = [devices] for device_id in devices: # Wait for otbr-agent and CHIP server start - self.assertTrue( - self.wait_for_device_output( - device_id, "Thread Border Router started on AIL", 10 - ) - ) - self.assertTrue( - self.wait_for_device_output(device_id, "[SVR] Server Listening...", 15) - ) + self.assertTrue(self.wait_for_device_output( + device_id, "Thread Border Router started on AIL", 10)) + self.assertTrue(self.wait_for_device_output( + device_id, "[SVR] Server Listening...", 15)) # Clear default Thread network commissioning data - self.logger.info( - "Resetting thread network on {}".format( - self.get_device_pretty_id(device_id) - ) - ) - self.execute_device_cmd(device_id, "ot-ctl factoryreset") + self.logger.info("Resetting thread network on {}".format( + self.get_device_pretty_id(device_id))) + self.execute_device_cmd(device_id, 'ot-ctl factoryreset') self.check_device_thread_state( - device_id=device_id, expected_role="disabled", timeout=10 - ) + device_id=device_id, expected_role="disabled", timeout=10) def check_device_thread_state(self, device_id, expected_role, timeout): if isinstance(expected_role, str): expected_role = [expected_role] self.logger.info( - f"Waiting for expected role. {self.get_device_pretty_id(device_id)}: {expected_role}" - ) + f"Waiting for expected role. {self.get_device_pretty_id(device_id)}: {expected_role}") start = time.time() while time.time() < (start + timeout): - reply = self.execute_device_cmd(device_id, "ot-ctl state") - if reply["output"].split()[0] in expected_role: + reply = self.execute_device_cmd(device_id, 'ot-ctl state') + if reply['output'].split()[0] in expected_role: return time.sleep(0.5) self.logger.error( - f"Device {self.get_device_pretty_id(device_id)} does not reach expected role" - ) + f"Device {self.get_device_pretty_id(device_id)} does not reach expected role") raise AssertionError - def form_thread_network( - self, - device_id: str, - expected_role: Union[str, List[str]], - timeout: int = 15, - dataset: str = "", - ): + def form_thread_network(self, device_id: str, expected_role: Union[str, List[str]], timeout: int = 15, + dataset: str = ""): """ Start Thread Network with provided dataset. If dataset is not provided then default will be set. Function that will be also verifying if device start in expected role. """ if not dataset: - dataset = ( - "0e080000000000010000" - + "000300000c" - + "35060004001fffe0" - + "0208fedcba9876543210" - + "0708fd00000000001234" - + "0510ffeeddccbbaa99887766554433221100" - + "030e54657374696e674e6574776f726b" - + "0102d252" - + "041081cb3b2efa781cc778397497ff520fa50c0302a0ff" - ) + dataset = "0e080000000000010000" + \ + "000300000c" + \ + "35060004001fffe0" + \ + "0208fedcba9876543210" + \ + "0708fd00000000001234" + \ + "0510ffeeddccbbaa99887766554433221100" + \ + "030e54657374696e674e6574776f726b" + \ + "0102d252" + \ + "041081cb3b2efa781cc778397497ff520fa50c0302a0ff" ot_init_commands = [ "ot-ctl thread stop", @@ -212,33 +195,30 @@ def form_thread_network( "ot-ctl dataset active", ] self.logger.info( - f"Setting Thread dataset for {self.get_device_pretty_id(device_id)}: {dataset}" - ) + f"Setting Thread dataset for {self.get_device_pretty_id(device_id)}: {dataset}") for cmd in ot_init_commands: self.execute_device_cmd(device_id, cmd) self.check_device_thread_state( - device_id=device_id, expected_role=expected_role, timeout=timeout - ) + device_id=device_id, expected_role=expected_role, timeout=timeout) def connect_to_thread_network(self): - """ + ''' The dataset in this function is used to replace the default dataset generated by openthread. When the test writer is calling this function to setup a thread network, it means they just want a working IPv6 network or a working thread network and don't care about the detail of this network. - """ + ''' self.logger.info("Running commands to form default Thread network") for device in self.thread_devices: - self.wait_for_device_output(device["id"], "Border router agent started.", 5) + self.wait_for_device_output( + device['id'], "Border router agent started.", 5) otInitCommands = [ "ot-ctl thread stop", "ot-ctl ifconfig down", - ( - "ot-ctl dataset set active 0e080000000000010000000300000d35060004001fffe00208d" - "ead00beef00cafe0708fd01234567890abc051000112233445566778899aabbccddeeff030a4f" - "70656e546872656164010212340410ad463152f9622c7297ec6c6c543a63e70c0302a0ff" - ), + ("ot-ctl dataset set active 0e080000000000010000000300000d35060004001fffe00208d" + "ead00beef00cafe0708fd01234567890abc051000112233445566778899aabbccddeeff030a4f" + "70656e546872656164010212340410ad463152f9622c7297ec6c6c543a63e70c0302a0ff"), "ot-ctl ifconfig up", "ot-ctl thread start", "ot-ctl dataset active", # Emit @@ -246,19 +226,17 @@ def connect_to_thread_network(self): for device in self.thread_devices: # Set default openthread provisioning for cmd in otInitCommands: - self.execute_device_cmd(device["id"], cmd) + self.execute_device_cmd(device['id'], cmd) self.logger.info("Waiting for Thread network to be formed...") threadNetworkFormed = False for i in range(30): roles = list() for device in self.thread_devices: # We can only check the status of ot-agent by query its state. - reply = self.execute_device_cmd(device["id"], "ot-ctl state") - roles.append(reply["output"].split()[0]) - threadNetworkFormed = (roles.count("leader") == 1) and ( - roles.count("leader") + roles.count("router") + roles.count("child") - == len(self.thread_devices) - ) + reply = self.execute_device_cmd(device['id'], 'ot-ctl state') + roles.append(reply['output'].split()[0]) + threadNetworkFormed = (roles.count('leader') == 1) and (roles.count( + 'leader') + roles.count('router') + roles.count('child') == len(self.thread_devices)) if threadNetworkFormed: break time.sleep(1) @@ -266,46 +244,33 @@ def connect_to_thread_network(self): self.logger.info("Thread network formed") def enable_wifi_on_device(self): - ssid, psk = self.query_api("wifi_ssid_psk", [self.home_id]) + ssid, psk = self.query_api('wifi_ssid_psk', [self.home_id]) self.logger.info("wifi ap ssid: {}, psk: {}".format(ssid, psk)) for device in self.non_ap_devices: self.logger.info( "device: {} connecting to desired ssid: {}".format( - self.get_device_pretty_id(device["id"]), ssid - ) - ) - self.write_psk_to_wpa_supplicant_config(device["id"], ssid, psk) - self.kill_existing_wpa_supplicant(device["id"]) - self.start_wpa_supplicant(device["id"]) + self.get_device_pretty_id(device['id']), ssid)) + self.write_psk_to_wpa_supplicant_config(device['id'], ssid, psk) + self.kill_existing_wpa_supplicant(device['id']) + self.start_wpa_supplicant(device['id']) time.sleep(5) def get_device_thread_ip(self, device_id): - ret = self.execute_device_cmd(device_id, "ot-ctl ipaddr") + ret = self.execute_device_cmd(device_id, 'ot-ctl ipaddr') ipaddr_list = ret["output"].splitlines() for ipstr in ipaddr_list: try: self.logger.info( - "device: {} thread ip: {}".format( - self.get_device_pretty_id(device_id), ipstr - ) - ) + "device: {} thread ip: {}".format(self.get_device_pretty_id(device_id), ipstr)) ipaddr = ipaddress.ip_address(ipstr) if ipaddr.is_link_local: continue if not ipaddr.is_private: continue - if ( - re.match( - ( - "fd[0-9a-f]{2}:[0-9a-f]{4}:[0-9a-f]" - "{4}:[0-9a-f]{4}:0000:00ff:fe00:[0-9a-f]{4}" - ), - ipaddr.exploded, - ) - is not None - ): + if re.match(("fd[0-9a-f]{2}:[0-9a-f]{4}:[0-9a-f]" + "{4}:[0-9a-f]{4}:0000:00ff:fe00:[0-9a-f]{4}"), ipaddr.exploded) is not None: continue self.logger.info("Get Mesh-Local EID: {}".format(ipstr)) return str(ipaddr) @@ -315,17 +280,12 @@ def get_device_thread_ip(self, device_id): return None def get_device_log(self, device_id): - return self.query_api("device_log", [self.home_id, device_id], binary=True) + return self.query_api('device_log', [self.home_id, device_id], binary=True) def wait_for_device_output(self, device_id, pattern, timeout=1): due = time.time() + timeout while True: - if self.sequenceMatch( - self.get_device_log(device_id).decode(), - [ - pattern, - ], - ): + if self.sequenceMatch(self.get_device_log(device_id).decode(), [pattern, ]): return True if time.time() < due: time.sleep(1) @@ -334,11 +294,11 @@ def wait_for_device_output(self, device_id, pattern, timeout=1): return False def assertTrue(self, exp, note=None): - """ + ''' assert{True|False} assert(Not)Equal python unittest style functions that raise exceptions when condition not met - """ + ''' if exp is not True: if note: self.logger.error(note) @@ -365,69 +325,49 @@ def assertNotEqual(self, val1, val2, note=None): def _build_request_url(self, end_point, args=[]): if len(args) == 0: return urljoin(self.cirque_url, end_point) - return urljoin( - self.cirque_url, - "{}/{}".format(end_point, "/".join([str(argv) for argv in args])), - ) + return urljoin(self.cirque_url, "{}/{}".format(end_point, '/'.join([str(argv) for argv in args]))) def destroy_home(self): self.logger.info("destroying home: {}".format(self.home_id)) - self.query_api("destroy_home", [self.home_id]) + self.query_api('destroy_home', [self.home_id]) def initialize_home(self): home_id = requests.post( - self._build_request_url("create_home"), json=self.device_config - ).json() + self._build_request_url('create_home'), json=self.device_config).json() self.logger.info("home id: {} created!".format(home_id)) - self.assertTrue( - home_id in list(self.query_api("get_homes")), - "created home_id did not match id from get_homes!!", - ) + self.assertTrue(home_id in + list(self.query_api('get_homes')), + "created home_id did not match id from get_homes!!") self.home_id = home_id device_types = set() - created_devices = self.query_api("home_devices", [home_id]) + created_devices = self.query_api('home_devices', [home_id]) - self.logger.info( - "home id: {} devices: {}".format( - home_id, json.dumps(created_devices, indent=4, sort_keys=True) - ) - ) + self.logger.info("home id: {} devices: {}".format( + home_id, json.dumps(created_devices, indent=4, sort_keys=True))) for device in created_devices.values(): - device_types.add(device["type"]) + device_types.add(device['type']) wanted_device_types = set() for device in self.device_config.values(): - wanted_device_types.add(device["type"]) + wanted_device_types.add(device['type']) - self.assertEqual( - device_types, - wanted_device_types, - "created device does not match to device config!!", - ) + self.assertEqual(device_types, wanted_device_types, + "created device does not match to device config!!") self.device_config = created_devices self.device_ids = [device_id for device_id in self.device_config] - self.non_ap_devices = [ - device - for device in self.device_config.values() - if device["type"] != "wifi_ap" - ] - self.thread_devices = [ - device - for device in self.device_config.values() - if device["capability"].get("Thread", None) is not None - ] - self.ap_devices = [ - device - for device in self.device_config.values() - if device["type"] == "wifi_ap" - ] + self.non_ap_devices = [device for device in self.device_config.values() + if device['type'] != 'wifi_ap'] + self.thread_devices = [device for device in self.device_config.values() + if device['capability'].get('Thread', None) is not None] + self.ap_devices = [device for device in self.device_config.values() + if device['type'] == 'wifi_ap'] def save_device_logs(self): timestamp = int(time.time()) @@ -436,61 +376,47 @@ def save_device_logs(self): os.makedirs("logs") for device in self.non_ap_devices: - ret_log = self.get_device_log(device["id"]) + ret_log = self.get_device_log(device['id']) # Use this format for easier sort - f_name = "{}-{}-{}.log".format(device["type"], timestamp, device["id"][:8]) + f_name = '{}-{}-{}.log'.format(device['type'], + timestamp, device['id'][:8]) self.logger.debug("device log name: \n{}".format(f_name)) - with open(os.path.join(log_dir, f_name), "wb") as fp: + with open(os.path.join(log_dir, f_name), 'wb') as fp: fp.write(ret_log) def start_wpa_supplicant(self, device_id): - self.logger.info( - "device: {}: starting wpa_supplicant on device".format( - self.get_device_pretty_id(device_id) - ) - ) + self.logger.info("device: {}: starting wpa_supplicant on device" + .format(self.get_device_pretty_id(device_id))) start_wpa_supplicant_command = "".join( - [ - "wpa_supplicant -B -i wlan0 ", - "-c /etc/wpa_supplicant/wpa_supplicant.conf ", - "-f /var/log/wpa_supplicant.log -t -dd", - ] - ) + ["wpa_supplicant -B -i wlan0 ", + "-c /etc/wpa_supplicant/wpa_supplicant.conf ", + "-f /var/log/wpa_supplicant.log -t -dd"]) return self.execute_device_cmd(device_id, start_wpa_supplicant_command) def write_psk_to_wpa_supplicant_config(self, device_id, ssid, psk): - self.logger.info( - "device: {}: writing ssid, psk to wpa_supplicant config".format( - self.get_device_pretty_id(device_id) - ) - ) + self.logger.info("device: {}: writing ssid, psk to wpa_supplicant config" + .format(self.get_device_pretty_id(device_id))) write_psk_command = "".join( - [ - "sh -c 'wpa_passphrase {} {} >> ".format(ssid, psk), - "/etc/wpa_supplicant/wpa_supplicant.conf'", - ] - ) + ["sh -c 'wpa_passphrase {} {} >> ".format(ssid, psk), + "/etc/wpa_supplicant/wpa_supplicant.conf'"]) return self.execute_device_cmd(device_id, write_psk_command) def kill_existing_wpa_supplicant(self, device_id): - self.logger.info( - "device: {}: kill existing wpa_supplicant".format( - self.get_device_pretty_id(device_id) - ) - ) + self.logger.info("device: {}: kill existing wpa_supplicant" + .format(self.get_device_pretty_id(device_id))) - kill_wpa_supplicant_command = "killall wpa_supplicant" + kill_wpa_supplicant_command = 'killall wpa_supplicant' return self.execute_device_cmd(device_id, kill_wpa_supplicant_command) def get_device_pretty_name(self, device_id): device_obj = self.device_config.get(device_id, None) if device_obj is not None: - return device_obj["type"] + return device_obj['type'] return "" def get_device_pretty_id(self, device_id): @@ -498,6 +424,4 @@ def get_device_pretty_id(self, device_id): @property def default_base_image(cls): - return os.environ.get( - "CHIP_CIRQUE_BASE_IMAGE", "project-chip/chip-cirque-device-base" - ) + return os.environ.get("CHIP_CIRQUE_BASE_IMAGE", "project-chip/chip-cirque-device-base")