diff --git a/agent.py b/agent.py index 5db9ac8..035ac66 100644 --- a/agent.py +++ b/agent.py @@ -621,6 +621,89 @@ def bciCallback(self, topic, payload): self.helpers.logger.error( entityType + " " + entity + " BCI data update KO") + def aiModelCallback(self, topic, payload): + """Called in the event of an AI payload + + Args: + topic (str): The topic the payload was sent to. + payload (:obj:`str`): The payload. + """ + + data = json.loads(payload.decode("utf-8")) + splitTopic = topic.split("/") + + if splitTopic[1] not in self.ignoreTypes: + entityType = splitTopic[1][:-1] + else: + entityType = splitTopic[1] + + if entityType in ["Robotics", "Application", "Staff"]: + entity = splitTopic[2] + else: + entity = splitTopic[3] + + self.helpers.logger.info( + "Received " + entityType + " AI Data: " + str(data)) + + attrs = self.getRequiredAttributes(entityType, entity) + bch = attrs["blockchain"] + + if not self.hiasbch.iotJumpWayAccessCheck(bch): + return + + entity = attrs["id"] + location = attrs["location"] + zone = attrs["zone"] if "zone" in attrs else "NA" + + updateResponse = self.hiascdi.updateEntity( + entity, entityType, { + "network.status": {"value": "ONLINE"}, + "network.status.metadata": {"timestamp": datetime.now().isoformat()}, + "dateModified": {"value": datetime.now().isoformat()} + }) + + models = self.hiascdi.getAiModels(entity, entityType) + modelData = models["models"]["value"] + modelExists = False + + for model in modelData: + modelExists = True + if model == data["Model"] and data["State"] in modelData[data["Model"]]["states"]["value"]: + modelData[data["Model"]]["state"] = { + "value": data["State"], + "timestamp": datetime.now().isoformat() + } + if model == data["Model"] and data["Type"] in modelData[data["Model"]]["properties"]["value"]: + modelData[data["Model"]]["properties"]["value"][data["Type"]] = { + "value": data["Value"], + "timestamp": datetime.now().isoformat() + } + + if modelExists: + updateResponse = self.hiascdi.updateEntity( + entity, entityType, { + "models": {"value": modelData}, + "dateModified": {"value": datetime.now().isoformat()} + }) + + if updateResponse: + _id = self.hiashdi.insertData("AI", { + "Use": entityType, + "Location": location, + "Zone": zone, + "Agent": entity, + "Type": data["Type"], + "Value": data["Value"], + "Message": data["Message"], + "Time": datetime.now().strftime('%Y-%m-%d %H:%M:%S') + }) + + self.helpers.logger.info( + entityType + " " + entity + " AI model data update OK") + else: + self.helpers.logger.error( + entityType + " " + entity + " AI model data update KO") + def respond(self, responseCode, response): """ Returns the request repsonse """ @@ -671,12 +754,13 @@ def main(): "up": agent.credentials["iotJumpWay"]["up"] }) - agent.mqtt.statusCallback = agent.statusCallback - agent.mqtt.lifeCallback = agent.lifeCallback - agent.mqtt.sensorsCallback = agent.sensorsCallback agent.mqtt.actuatorCallback = agent.actuatorCallback - agent.mqtt.commandsCallback = agent.commandsCallback + agent.mqtt.aiModelCallback = agent.aiModelCallback agent.mqtt.bciCallback = agent.bciCallback + agent.mqtt.commandsCallback = agent.commandsCallback + agent.mqtt.lifeCallback = agent.lifeCallback + agent.mqtt.sensorsCallback = agent.sensorsCallback + agent.mqtt.statusCallback = agent.statusCallback agent.threading() diff --git a/modules/hiascdi.py b/modules/hiascdi.py index fa47189..f47138e 100644 --- a/modules/hiascdi.py +++ b/modules/hiascdi.py @@ -110,3 +110,14 @@ def getActuators(self, _id, typeof): response = requests.get(apiUrl, headers=self.headers, auth=self.auth) return json.loads(response.text) + + def getAiModels(self, _id, typeof): + """ Gets AI Agent models. """ + + apiUrl = "https://" + self.helpers.credentials["server"]["host"] + "/" + \ + self.helpers.credentials["hiascdi"]["endpoint"] + \ + "/entities/" + _id + "?type=" + typeof + "&attrs=models" + + response = requests.get(apiUrl, headers=self.headers, auth=self.auth) + + return json.loads(response.text) diff --git a/modules/mqtt.py b/modules/mqtt.py index c78e726..bafccd7 100644 --- a/modules/mqtt.py +++ b/modules/mqtt.py @@ -98,7 +98,7 @@ def configure(self): self.commandsCallback = None self.integrityCallback = None self.lifeCallback = None - self.modelCallback = None + self.aiModelCallback = None self.sensorsCallback = None self.stateCallback = None self.statusCallback = None @@ -170,12 +170,10 @@ def on_message(self, client, obj, msg): if connType == "Agents": topic = splitTopic[4] - elif connType == "Robotics": - topic = splitTopic[3] + elif connType == "AI": + topic = splitTopic[4] elif connType == "Applications": topic = splitTopic[3] - elif connType == "Staff": - topic = splitTopic[3] elif connType == "Devices": topic = splitTopic[4] elif connType == "HIASBCH": @@ -184,6 +182,10 @@ def on_message(self, client, obj, msg): topic = splitTopic[4] elif connType == "HIASHDI": topic = splitTopic[4] + elif connType == "Robotics": + topic = splitTopic[3] + elif connType == "Staff": + topic = splitTopic[3] self.helpers.logger.info(msg.payload) self.helpers.logger.info("iotJumpWay " + connType + " " + msg.topic + " communication received.") @@ -194,6 +196,12 @@ def on_message(self, client, obj, msg): connType + " actuator callback required (actuatorCallback) !") else: self.actuatorCallback(msg.topic, msg.payload) + elif topic == 'AI': + if self.aiModelCallback == None: + self.helpers.logger.info( + connType + " AI callback required (aiModelCallback) !") + else: + self.aiModelCallback(msg.topic, msg.payload) elif topic == 'BCI': if self.bciCallback == None: self.helpers.logger.info(