From 2c28bf2c0d2fe87418529fcc2b45443692a56f61 Mon Sep 17 00:00:00 2001 From: winniex1 Date: Mon, 4 Sep 2017 10:51:46 -0700 Subject: [PATCH 1/5] Property process improvement: 1. add get property feature support for iota dcc to sync local records with those in adapter when reboot occurs 2. make the feature of getting property after reboot configurable (off by default) 3. filter out system generated properties after calling "get properties" 4. add process for a case when device type == '' --- config/liota.conf | 1 + liota/dccs/iotcc.py | 68 ++++++++++++++++++++++++++++++++++++++++++--- requirements.txt | 2 +- 3 files changed, 66 insertions(+), 5 deletions(-) diff --git a/config/liota.conf b/config/liota.conf index 13140bd9..4f09d85f 100644 --- a/config/liota.conf +++ b/config/liota.conf @@ -8,6 +8,7 @@ log_path = /var/log/liota dev_file_path = /usr/lib/liota/devs entity_file_path = /usr/lib/liota/entity iotcc_path = /usr/lib/liota/iotcc.json +enable_reboot_getprop = False [CORE_CFG] collect_thread_pool_size = 30 diff --git a/liota/dccs/iotcc.py b/liota/dccs/iotcc.py index 7570ca28..2151d0f8 100644 --- a/liota/dccs/iotcc.py +++ b/liota/dccs/iotcc.py @@ -37,7 +37,9 @@ import ConfigParser import os import Queue +import datetime from time import gmtime, strftime +from uptime import boottime from threading import Lock from liota.dccs.dcc import DataCenterComponent, RegistrationFailure @@ -75,6 +77,8 @@ def __init__(self, con): self._iotcc_json = self._create_iotcc_json() self.counter = 0 self.recv_msg_queue = self.comms.userdata + self.boottime = boottime() + self.dev_file_path = self._get_file_storage_path("dev_file_path") # Liota internal entity file system path special for iotcc self.entity_file_path = self._get_file_storage_path("entity_file_path") @@ -99,6 +103,7 @@ def on_response(msg): self.comms.client.disconnect() log.error("HelixProtocolException: " + repr(error)) raise Exception("HelixProtocolException") + self.enable_reboot_getprop = read_liota_config('IOTCC_PATH', 'enable_reboot_getprop') def register(self, entity_obj): """ Register the objects @@ -240,6 +245,13 @@ def _properties(self, msg_id, entity_type, entity_id, entity_name, timestamp, pr msg["body"]["property_data"].append({"propertyKey": key, "propertyValue": value}) return msg + def _get_properties(self, msg_id, res_uuid): + return { + "transactionID": msg_id, + "type": "get_properties", + "uuid": res_uuid + } + def _format_data(self, reg_metric): met_cnt = reg_metric.values.qsize() if met_cnt == 0: @@ -400,7 +412,7 @@ def write_entity_json_file(self, prop_dict, attribute_list, uuid, remove): if prop_dict is not None: for key in prop_dict.iterkeys(): value = prop_dict[key] - if key == 'entity type' or key == 'name' or key == 'device type': + if key == 'entity type' or key == 'name' or key == 'device type' or key == 'Entity_Timestamp': continue attribute_list.append({key: value}) attribute_list.append({"LastSeenTimestamp": strftime("%Y-%m-%dT%H:%M:%S", gmtime())}) @@ -470,6 +482,7 @@ def store_device_info(self, uuid, name, dev_type, prop_dict, remove_device): def write_entity_file(self, prop_dict, res_uuid): file_path = self.entity_file_path + '/' + res_uuid + '.json' + prop_dict.update({"Entity_Timestamp": datetime.datetime.now().strftime("%Y-%m-%dT%H:%M:%S")}) try: with open(file_path, "w") as json_file: if (prop_dict is not None): @@ -488,10 +501,24 @@ def read_entity_file(self, res_uuid): log.error('Read file error') return prop_dict + def merge_prop_dict_list(self, prop_dict, prop_list): + # prop_dict: new property dictionary + # prop_list: list of dictionary items + if (prop_list is None): + return prop_dict + if (prop_dict is None): + prop_dict = {} + for item in prop_list: + for key in item: + if key.find("System Properties|") == -1: + prop_dict.update(item) + # get updated dict + return prop_dict + def store_reg_entity_attributes(self, entity_type, entity_name, reg_entity_id, dev_type, prop_dict): log.debug('store_reg_entity_attributes\n {0}:{1}:{2}:{3}'.format(entity_type, - entity_name, reg_entity_id, prop_dict)) + entity_name, reg_entity_id, prop_dict)) ### Update IOTCC local entity file first # look for uuid.json file first, if not, first time to write @@ -513,8 +540,16 @@ def store_reg_entity_attributes(self, entity_type, entity_name, reg_entity_id, new_prop_dict = tmp_dict else: tmp_dict = self.read_entity_file(reg_entity_id) - if ((tmp_dict["entity type"] == entity_type) and (tmp_dict["name"] == entity_name) - and (tmp_dict["device type"] == dev_type)): + # check Entity_Timestamp of entity_file: if < boottime, get properties from cloud + if (self.enable_reboot_getprop == "True") and ('Entity_Timestamp' in tmp_dict): + last_dtime = datetime.datetime.strptime(tmp_dict["Entity_Timestamp"], "%Y-%m-%dT%H:%M:%S") + if (last_dtime <= self.boottime): + list_prop = self.get_properties(reg_entity_id) + tmp_dict = self.merge_prop_dict_list(tmp_dict, list_prop) + if ((('entity type' in tmp_dict) and (tmp_dict["entity type"] == entity_type)) and + (('name' in tmp_dict) and (tmp_dict["name"] == entity_name)) and + (('device type' in tmp_dict) and ((tmp_dict["device type"] == dev_type) or + ((tmp_dict["device type"] == '') and (dev_type == None))))): # the same entity if (tmp_dict is not None) and (prop_dict is not None): new_prop_dict = dict(tmp_dict.items() + prop_dict.items()) @@ -590,3 +625,28 @@ def _unregistration(self, msg_id, ref_entity): "name": ref_entity.name } } + + def get_properties(self, resource_uuid): + """ get list of properties with resource uuid """ + log.info("Get properties defined with IoTCC for resource {0}".format(resource_uuid)) + self.prop_list = None + + def on_response(msg): + try: + log.debug("Received msg: {0}".format(msg)) + json_msg = json.loads(msg) + log.debug("Processed msg: {0}".format(json_msg["type"])) + if json_msg["type"] == "get_properties_response" and json_msg["body"]["uuid"] != "null" and \ + json_msg["body"]["uuid"] == resource_uuid: + log.info("FOUND PROPERTIE LIST: {0}".format(json_msg["body"]["propertyList"])) + self.prop_list = json_msg["body"]["propertyList"] + log.info("prop_list:{0}".format(self.prop_list)) + else: + log.info("Waiting for getting properties") + on_response(self.recv_msg_queue.get(True,30)) + except: + raise Exception("Exception while getting properties") + + self.comms.send(json.dumps(self._get_properties(self.next_id(), resource_uuid))) + on_response(self.recv_msg_queue.get(True,30)) + return self.prop_list diff --git a/requirements.txt b/requirements.txt index cc00ea7c..1b855d6b 100644 --- a/requirements.txt +++ b/requirements.txt @@ -5,4 +5,4 @@ mock==2.0.0 paho-mqtt==1.2 pint==0.7.2 websocket-client==0.37.0 - +uptime==3.0.1 From b60f37e6571203780b81b591fc81655b3952e23e Mon Sep 17 00:00:00 2001 From: winniex1 Date: Fri, 20 Oct 2017 16:33:22 -0700 Subject: [PATCH 2/5] For get property support, add more comments and correct typos --- liota/dccs/iotcc.py | 11 ++++++----- 1 file changed, 6 insertions(+), 5 deletions(-) diff --git a/liota/dccs/iotcc.py b/liota/dccs/iotcc.py index 21f55d0d..9cf4f214 100644 --- a/liota/dccs/iotcc.py +++ b/liota/dccs/iotcc.py @@ -568,6 +568,7 @@ def store_reg_entity_attributes(self, entity_type, entity_name, reg_entity_id, last_dtime = datetime.datetime.strptime(tmp_dict["Entity_Timestamp"], "%Y-%m-%dT%H:%M:%S") if (last_dtime <= self.boottime): list_prop = self.get_properties(reg_entity_id) + # merge property info from get_properties() into our local entity record tmp_dict = self.merge_prop_dict_list(tmp_dict, list_prop) if ((('entity type' in tmp_dict) and (tmp_dict["entity type"] == entity_type)) and (('name' in tmp_dict) and (tmp_dict["name"] == entity_name)) and @@ -661,15 +662,15 @@ def on_response(msg): log.debug("Processed msg: {0}".format(json_msg["type"])) if json_msg["type"] == "get_properties_response" and json_msg["body"]["uuid"] != "null" and \ json_msg["body"]["uuid"] == resource_uuid: - log.info("FOUND PROPERTIE LIST: {0}".format(json_msg["body"]["propertyList"])) + log.info("FOUND PROPERTY LIST: {0}".format(json_msg["body"]["propertyList"])) self.prop_list = json_msg["body"]["propertyList"] - log.info("prop_list:{0}".format(self.prop_list)) else: log.info("Waiting for getting properties") - on_response(self.recv_msg_queue.get(True,30)) + on_response(self.recv_msg_queue.get(True,300)) except: - raise Exception("Exception while getting properties") + log.exception("Exception while getting properties") + return None self.comms.send(json.dumps(self._get_properties(self.next_id(), resource_uuid))) - on_response(self.recv_msg_queue.get(True,30)) + on_response(self.recv_msg_queue.get(True,300)) return self.prop_list From c322ebb6a2226968596c970c0f46d66327803e38 Mon Sep 17 00:00:00 2001 From: winniex1 Date: Wed, 25 Oct 2017 00:06:33 -0700 Subject: [PATCH 3/5] Adapt to newly refactored Helix Adapter; remove system generated property filter (move it into Helix Adapter side). --- liota-lite/requirements.txt | 1 + liota/dccs/iotcc.py | 14 ++++++++------ 2 files changed, 9 insertions(+), 6 deletions(-) diff --git a/liota-lite/requirements.txt b/liota-lite/requirements.txt index a140f1c0..910fe413 100644 --- a/liota-lite/requirements.txt +++ b/liota-lite/requirements.txt @@ -3,4 +3,5 @@ linux-metrics==0.1.4 mock==2.0.0 paho-mqtt==1.2 pint==0.7.2 +uptime==3.0.1 diff --git a/liota/dccs/iotcc.py b/liota/dccs/iotcc.py index 6535a131..dd35a305 100644 --- a/liota/dccs/iotcc.py +++ b/liota/dccs/iotcc.py @@ -238,8 +238,11 @@ def _properties(self, msg_id, entity_type, entity_id, entity_name, timestamp, pr def _get_properties(self, msg_id, res_uuid): return { "transactionID": msg_id, - "type": "get_properties", - "uuid": res_uuid + "version": self._version, + "type": "get_properties_request", + "body": { + "uuid": res_uuid + } } def _format_data(self, reg_metric): @@ -524,9 +527,7 @@ def merge_prop_dict_list(self, prop_dict, prop_list): if (prop_dict is None): prop_dict = {} for item in prop_list: - for key in item: - if key.find("System Properties|") == -1: - prop_dict.update(item) + prop_dict.update(item) # get updated dict return prop_dict @@ -652,7 +653,8 @@ def on_response(msg): try: log.debug("Received msg: {0}".format(msg)) json_msg = json.loads(msg) - log.debug("Processed msg: {0}".format(json_msg["type"])) + log.debug("Processing msg: {0}".format(json_msg["type"])) + self._check_version(json_msg) if json_msg["type"] == "get_properties_response" and json_msg["body"]["uuid"] != "null" and \ json_msg["body"]["uuid"] == resource_uuid: log.info("FOUND PROPERTY LIST: {0}".format(json_msg["body"]["propertyList"])) From 2c69b5fa8bcd4d3897d19488a6391cd548ffe7ff Mon Sep 17 00:00:00 2001 From: winniex1 Date: Wed, 1 Nov 2017 16:49:03 -0700 Subject: [PATCH 4/5] unify queue waiting timeout --- liota/dccs/iotcc.py | 5 ++--- 1 file changed, 2 insertions(+), 3 deletions(-) mode change 100644 => 100755 liota/dccs/iotcc.py diff --git a/liota/dccs/iotcc.py b/liota/dccs/iotcc.py old mode 100644 new mode 100755 index 561e49d9..5ca7ce51 --- a/liota/dccs/iotcc.py +++ b/liota/dccs/iotcc.py @@ -664,11 +664,10 @@ def on_response(msg): self.prop_list = json_msg["body"]["propertyList"] else: log.info("Waiting for getting properties") - on_response(self.recv_msg_queue.get(True,300)) + on_response(self.recv_msg_queue.get(True, timeout)) except: log.exception("Exception while getting properties") - return None self.comms.send(json.dumps(self._get_properties(self.next_id(), resource_uuid))) - on_response(self.recv_msg_queue.get(True,300)) + on_response(self.recv_msg_queue.get(True, timeout)) return self.prop_list From fca0d548a1f9f4126d20e10a9d8d5f45d9e5d9cc Mon Sep 17 00:00:00 2001 From: winniex1 Date: Thu, 2 Nov 2017 16:40:29 -0700 Subject: [PATCH 5/5] Get configuration of "enable_reboot_getprop" from liota.conf --- liota/dccs/iotcc.py | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/liota/dccs/iotcc.py b/liota/dccs/iotcc.py index 5ca7ce51..f885a3c4 100755 --- a/liota/dccs/iotcc.py +++ b/liota/dccs/iotcc.py @@ -75,6 +75,7 @@ def __init__(self, con): time.sleep(0.5) self._iotcc_json = self._create_iotcc_json() self._iotcc_json_load_retry = int(read_liota_config('IOTCC_PATH', 'iotcc_load_retry')) + self.enable_reboot_getprop = read_liota_config('IOTCC_PATH', 'enable_reboot_getprop') self.counter = 0 self.recv_msg_queue = self.comms.userdata self.boottime = boottime() @@ -138,7 +139,7 @@ def on_response(msg): return RegisteredEntity(entity_obj, self, self.reg_entity_id) def _check_version(self, json_msg): - if json_msg["version"] != self._version: + if json_msg["version"] != self._version: raise Exception("CLIENT SERVER VERSION MISMATCH. CLIENT VERSION IS:" + self._version + ". SERVER VERSION IS:" + json_msg["version"]) def unregister(self, entity_obj): @@ -536,7 +537,7 @@ def merge_prop_dict_list(self, prop_dict, prop_list): def store_reg_entity_attributes(self, entity_type, entity_name, reg_entity_id, dev_type, prop_dict): - log.debug('store_reg_entity_attributes\n {0}:{1}:{2}:{3}'.format(entity_type, + log.debug('store_reg_entity_attributes {0}:{1}:{2}:{3}'.format(entity_type, entity_name, reg_entity_id, prop_dict)) ### Update IOTCC local entity file first