From 2f08491c04579dfa37052ddc10ab87f9c411bc78 Mon Sep 17 00:00:00 2001 From: Patrick Pichon Date: Fri, 13 Dec 2024 17:02:25 +0100 Subject: [PATCH] Implement the automatiq update --- Classes/WebServer/rest_ReplaceDevice.py | 184 ++++++++++++++++++++++-- 1 file changed, 172 insertions(+), 12 deletions(-) diff --git a/Classes/WebServer/rest_ReplaceDevice.py b/Classes/WebServer/rest_ReplaceDevice.py index d7f4b15a6..aa2a7bcc6 100644 --- a/Classes/WebServer/rest_ReplaceDevice.py +++ b/Classes/WebServer/rest_ReplaceDevice.py @@ -87,16 +87,7 @@ def get_device_clustertype_info(self, parameters): return _response try: - build_clustertype_info = {"DeviceIEEE": device_info.get("IEEE", "")} - - # Process endpoints and their cluster types - for ep, ep_info in device_info.get("Ep", {}).items(): - cluster_type = ep_info.get("ClusterType", {}) - build_clustertype_info[ep] = [ - {"WidgetIdx": widget_idx, "WidgetType": widget_type} - for widget_idx, widget_type in cluster_type.items() - ] - + build_clustertype_info = get_list_of_clustertype( device_info) _response["Data"] = json.dumps(build_clustertype_info, sort_keys=False) except Exception as e: @@ -171,7 +162,133 @@ def update_device(self, data): def update_device_automatically(self, old_ieee, new_ieee, _response): - pass + """ + Updates device information automatically by replacing the old device with the new one. + + This function retrieves the network IDs (`nwkid`) for the old and new devices using their + IEEE addresses. It then checks if the devices exist and compares their endpoint structure + (i.e., the number of endpoints). If there are discrepancies, an error is logged and returned. + + Parameters: + old_ieee (str): The IEEE address of the old device to update. + new_ieee (str): The IEEE address of the new device to replace the old one. + _response (dict): A dictionary to store the response, which is updated with success or error status. + + Returns: + dict: The updated response dictionary, containing the status or error message. + """ + + # Retrieve network IDs for the old and new devices + old_nwkid = self.IEEE2NWK.get(old_ieee) + new_nwkid = self.IEEE2NWK.get(new_ieee) + + # Check if network IDs exist for the given IEEE addresses + if old_nwkid is None or new_nwkid is None: + error_msg = f"update_device_automatically - Unknown device {old_ieee} or {new_ieee}" + self.logging("Error", error_msg) + _response["Data"] = {"error": error_msg} + return _response + + # Retrieve device information for both old and new devices + old_device_info = self.ListOfDevices.get(old_nwkid) + new_device_info = self.ListOfDevices.get(new_nwkid) + + # Check if device information was found + if old_device_info is None or new_device_info is None: + error_msg = f"update_device_automatically - Unable to get device information for {old_ieee} or {new_ieee}" + self.logging("Error", error_msg) + _response["Data"] = {"error": error_msg} + return _response + + # Retrieve cluster type information for both devices + old_clustertype_info = get_list_of_clustertype(old_device_info) + new_clustertype_info = get_list_of_clustertype(new_device_info) + + # Check if the number of endpoints matches between the old and new devices + old_cluster_type = old_clustertype_info.get("ClusterType", {}) + new_cluster_type = new_clustertype_info.get("ClusterType", {}) + + if len(old_cluster_type) != len(new_cluster_type): + error_msg = f"update_device_automatically - Different structure (number of endpoints) between {old_ieee} and {new_ieee}" + self.logging("Error", error_msg) + _response["Data"] = {"error": error_msg} + return _response + + # If the structure is the same, proceed with the update logic here (not shown) + update_device_automatically_widget_idx(self, old_clustertype_info, new_clustertype_info, new_nwkid) + + # On success, update the response + _response["Data"] = {"status": "success"} + return _response + + +def update_device_automatically_widget_idx(self, old_clustertype_info, new_clustertype_info, new_nwkid): + """ + Automatically updates the WidgetIdx of widgets in the new cluster information + to match the WidgetIdx from the old cluster information for devices with the same + WidgetType. + + This function iterates through the old and new cluster types, comparing widgets + by their WidgetType. When a match is found, the WidgetIdx in the new version is + updated to the WidgetIdx from the old version. + + Args: + old_clustertype_info (dict): The old cluster type information containing + the WidgetIdx and WidgetType for each widget. + new_clustertype_info (dict): The new cluster type information containing + the WidgetIdx and WidgetType for each widget. + new_nwkid (str): The network ID of the device to update. + + Logs: + Logs the process of matching and updating WidgetIdx for each widget type. + """ + # Initialize flag to track if any update was made + update_made = False + + # Log entry into the function + self.logging("Log", f"Starting widget index update for new_nwkid: {new_nwkid}") + + # Iterate over the cluster types in the old version + for old_ep, old_clustertype_info in old_clustertype_info.get("ClusterType", {}).items(): + # Log the current endpoint being processed + self.logging("Log", f"Processing old endpoint: {old_ep}") + + # Check if the same endpoint exists in the new version + if old_ep not in new_clustertype_info.get("ClusterType", {}): + self.logging("Log", f"Endpoint {old_ep} not found in new version. Skipping.") + continue + + # Get the new cluster info for matching endpoint + new_clustertype_info = new_clustertype_info["ClusterType"][old_ep] + self.logging("Log", f"Found matching endpoint {old_ep}, processing widgets.") + + # Iterate over widgets in the old and new cluster types + for old_widget in old_clustertype_info: + self.logging("Log", f"Processing old widget: {old_widget}") + for new_widget in new_clustertype_info: + self.logging("Log", f"Processing new widget: {new_widget}") + + if new_widget['WidgetType'] == old_widget['WidgetType']: + target_widget_idx = new_widget['WidgetIdx'] + new_widget_idx = old_widget['WidgetIdx'] + + # Log the matching widgets and their indices + self.logging("Log", f"Match found for WidgetType {old_widget['WidgetType']} " + + f"with old WidgetIdx {new_widget_idx} and new WidgetIdx {target_widget_idx}") + + # Call the update function with the appropriate parameters + update_successful = update_device_widgetidx(self, new_nwkid, target_widget_idx, new_widget_idx) + + if update_successful: + update_made = True + # Log the successful update + self.logging("Log", f"Updated WidgetIdx for {old_widget['WidgetType']} " + + f"from {new_widget_idx} to {target_widget_idx}") + break # Stop once we find the matching WidgetType (assuming WidgetType is unique) + + # Log function exit + self.logging("Log", "Widget index update process completed.") + return update_made def update_device_manually(self, ieee, nwkid, data, _response): @@ -186,7 +303,7 @@ def update_device_manually(self, ieee, nwkid, data, _response): Parameters: ieee (str): The IEEE address of the device to update. nwkid (str): The network ID of the device to update. - data (dict): A dictionary containing the 'WidgetIdx' (current widget index) + data (dict): A dictionary containing the 'WidgetIdx' (current widget index) and 'ReplaceByIdx' (new widget index). _response (dict): A dictionary that will be updated with the status or error message. @@ -257,3 +374,46 @@ def update_widget_idx(cluster_type, current_idx, new_idx): cluster_type[new_idx] = cluster_type.pop(current_idx) return True return False + + +def get_list_of_clustertype(device_info): + """ + Extracts the cluster type information from the device's endpoints. + + This function processes the device's endpoint information, extracting the + widget index (`WidgetIdx`) and widget type (`WidgetType`) from each + endpoint's `ClusterType`. It returns a dictionary containing the device's + IEEE address and the cluster type information for each endpoint. + + Parameters: + device_info (dict): A dictionary containing the device's details, + including the IEEE address and endpoint data. + + Returns: + dict: A dictionary containing the device's IEEE address and the cluster + type information for each endpoint. The structure of the returned + dictionary is as follows: + { + "DeviceIEEE": , + "ClusterType": { + "": [{"WidgetIdx": , "WidgetType": }, ...], + ... + } + } + """ + build_clustertype_info = {"DeviceIEEE": device_info.get("IEEE", ""), "ClusterType": {}} + + # Process endpoints and their cluster types + for ep, ep_info in device_info.get("Ep", {}).items(): + cluster_type = ep_info.get("ClusterType", {}) + + # If 'ClusterType' is found, update the 'ClusterType' dictionary + if cluster_type: + build_clustertype_info["ClusterType"][ep] = [ + {"WidgetIdx": widget_idx, "WidgetType": widget_type} + for widget_idx, widget_type in cluster_type.items() + ] + else: + build_clustertype_info["ClusterType"][ep] = [] # Ensure an empty list if no ClusterType + + return build_clustertype_info