diff --git a/app.py b/app.py index 3654ccb..2553368 100644 --- a/app.py +++ b/app.py @@ -1,6 +1,5 @@ import datetime import importlib -import json import logging import os import pathlib @@ -9,9 +8,8 @@ import httpx from kafka.producer.future import RecordMetadata -from pydantic.error_wrappers import ValidationError -from fvhiot.models.thingpark import DevEuiUplink + from fvhiot.utils import init_script from fvhiot.utils.data import data_unpack, data_pack from fvhiot.utils.kafka import get_kafka_producer_by_envs, get_kafka_consumer_by_envs @@ -26,9 +24,7 @@ def backup_messages(raw_data_topic: str, msg): """ backupdir = pathlib.Path("messages") / pathlib.Path(raw_data_topic) backupdir.mkdir(parents=True, exist_ok=True) - backupfile = backupdir / pathlib.Path( - "{}.msgpack".format(datetime.datetime.utcnow().strftime("%Y-%m-%d")) - ) + backupfile = backupdir / pathlib.Path("{}.msgpack".format(datetime.datetime.utcnow().strftime("%Y-%m-%d"))) try: with open(backupfile, "ab") as outfile: outfile.write(msg.value) # msg.value is msgpack.packb()'ed data @@ -51,13 +47,6 @@ def on_send_error(excp): logging.error("Error on Kafka producer", exc_info=excp) -# TODO: digita related -def get_uplink_obj(data: dict) -> DevEuiUplink: - body_data = json.loads(data["request"]["body"].decode()) - uplink_obj = DevEuiUplink(**body_data["DevEUI_uplink"]) - return uplink_obj - - def get_device_data_devreg(device_id: str) -> dict: """ Get device metadata from device registry @@ -70,8 +59,7 @@ def get_device_data_devreg(device_id: str) -> dict: devreg_token = os.getenv("DEVICE_REGISTRY_TOKEN") if devreg_url is None or devreg_token is None: logging.error( - "DEVICE_REGISTRY_URL and DEVICE_REGISTRY_TOKEN must be defined, " - "querying device metadata failed" + "DEVICE_REGISTRY_URL and DEVICE_REGISTRY_TOKEN must be defined, " "querying device metadata failed" ) return metadata if device_id is None: @@ -94,9 +82,7 @@ def get_device_data_devreg(device_id: str) -> dict: metadata = response.json() logging.debug(metadata) else: - logging.warning( - f"Device registry returned {response.status_code} {response.text}" - ) + logging.warning(f"Device registry returned {response.status_code} {response.text}") except httpx.HTTPError as err: logging.exception(f"{err}") @@ -114,25 +100,12 @@ def get_device_data(device_id: str) -> dict: if os.getenv("DEVICE_REGISTRY_URL"): return get_device_data_devreg(device_id) else: - logging.error( - "DEVICE_REGISTRY_URL must be defined, querying device metadata failed" - ) + logging.error("DEVICE_REGISTRY_URL must be defined, querying device metadata failed") return metadata -# TODO: digita related -def parse_payload( - parser_module, payload_hex: str, port: int, timestamp: datetime.datetime -): - # TODO: implement new function in parser_module, which accepts timestamp - val = parser_module.decode_hex(payload_hex, port=port) - return val - - # TODO: generic -def create_parsed_data_message( - timestamp: datetime.datetime, payload: list, device: dict -) -> dict: +def create_parsed_data_message(timestamp: datetime.datetime, payload: list, device: dict) -> dict: """ Mega function to create parsed data messages. Data structure loosely follows JTS (Json time series) format. @@ -145,9 +118,7 @@ def create_parsed_data_message( time_str = timestamp.isoformat() parsed_data["meta"] = { "timestamp_received": "{}".format(time_str), - "timestamp_parsed": "{}".format( - datetime.datetime.now(tz=ZoneInfo("UTC")).isoformat() - ), + "timestamp_parsed": "{}".format(datetime.datetime.now(tz=ZoneInfo("UTC")).isoformat()), } parsed_data["device"] = device # header varibles @@ -162,9 +133,7 @@ def create_parsed_data_message( for d in item["data"].keys(): keys.add(d) keys = sorted(list(keys)) # now we have all unique keys in sorted list - col_map = ( - {} - ) # create mapping for silly "0", "1", "2" named columns and real data keys + col_map = {} # create mapping for silly "0", "1", "2" named columns and real data keys for k in keys: col_name = str(col_cnt) # "0", "1", "2" and so on columns[col_name] = {"name": k} # e.g. columns["0] = {"name" : "temp"} @@ -175,9 +144,7 @@ def create_parsed_data_message( "time": item["time"], "f": {}, } # take "time" as is, we trust that it is a valid ISO date str - for k, v in sorted( - item["data"].items() - ): # put keys into "f" in sorted order (same as in header) + for k, v in sorted(item["data"].items()): # put keys into "f" in sorted order (same as in header) col_name = col_map[k] data_item["f"][col_name] = {"v": v} parsed_data["data"].append(data_item) @@ -195,15 +162,40 @@ def create_parsed_data_message( return parsed_data +def process_kafka_raw_topic(raw_data: bytes): + # TODO doc + unpacked_data = data_unpack(raw_data) + logging.info(pformat(unpacked_data)) + device_id = unpacked_data["device_id"] + if device_id is None: + logging.warning("Device id not found in raw data - unpacked_data['device_id'] ") + # TODO: store data for future re-processing + return unpacked_data, None, None + device_data = get_device_data(device_id) + # if device_data is None or "device_metadata" not in device_data: + if device_data is None: + logging.warning(f"Device data not found for device_id: {device_id}") + # TODO: store data for future re-processing + return unpacked_data, None, None + + parser_module_name = device_data.get("parser_module", "") + if parser_module_name == "": + logging.warning("Parser module name not found") + # TODO: store data for future re-processing + return unpacked_data, device_data, None + + print(device_data) + print(f"printing parser module {parser_module_name}") + return unpacked_data, device_data, parser_module_name + + def main(): init_script() - raw_data_topic = os.getenv("KAFKA_RAW_DATA_TOPIC_NAME") + raw_data_topic = "cesva.rawdata" # os.getenv("KAFKA_RAW_DATA_TOPIC_NAME") parsed_data_topic = os.getenv("KAFKA_PARSED_DATA_TOPIC_NAME") - logging.info( - f"Get Kafka consumer for {raw_data_topic} and producer for {parsed_data_topic}" - ) + logging.info(f"Get Kafka consumer for {raw_data_topic} and producer for {parsed_data_topic}") # Create Kafka consumer for incoming raw data messages - consumer = get_kafka_consumer_by_envs(raw_data_topic) + consumer = get_kafka_consumer_by_envs(raw_data_topic) # listen to multiple topics -> ? producer = get_kafka_producer_by_envs() if consumer is None or producer is None: logging.critical("Kafka connection failed, exiting.") @@ -213,62 +205,25 @@ def main(): logging.info("Parser is waiting for raw data messages from Kafka.") for msg in consumer: logging.info("Preparing to parse payload") - data = data_unpack(msg.value) - logging.info(pformat(data)) - # if os.getenv("DEBUG"): - # backup_messages(raw_data_topic, msg) - data["raw_data_topic"] = raw_data_topic # TODO: add this value in endpoint - try: - uplink_obj = get_uplink_obj(data) # TODO: handle errors here - except KeyError as err: - logging.warning(f"KeyError '{err}', message has no DevEUI_uplink key?") - continue - except ValidationError as err: - logging.warning(f"ValidationError '{err}'") - continue - device_data = get_device_data(uplink_obj.DevEUI) - # if device_data is None or "device_metadata" not in device_data: - if device_data is None: - logging.warning(f"Device data for {uplink_obj.DevEUI} not found.") - # TODO: store data for future re-processing - continue - # print(type(device_data), device_data) - # TODO: use better datetime parser? - packet_timestamp = datetime.datetime.strptime( - uplink_obj.Time, "%Y-%m-%dT%H:%M:%S.%f%z" - ) - parser_module_name = device_data.get("parser_module", "") - if parser_module_name == "": - logging.warning("Parser module name not found") - # TODO: store data for future re-processing - continue - logging.info( - f"Trying to parse hex payload {uplink_obj.payload_hex} with " - f"{parser_module_name}" - ) - try: - parser_module = importlib.import_module(parser_module_name) - except ModuleNotFoundError as err: - logging.warning(f"Importing parser module failed: {err}") - # TODO: store data for future re-processing - continue - try: - print(uplink_obj.payload_hex, uplink_obj.FPort, uplink_obj.Time) - datalines = parser_module.create_datalines( - uplink_obj.payload_hex, port=uplink_obj.FPort, time_str=uplink_obj.Time - ) - parsed_data = create_parsed_data_message( - packet_timestamp, datalines, device_data - ) - logging.debug(pformat(parsed_data)) - packed_data = data_pack(parsed_data) - producer.send(parsed_data_topic, packed_data).add_callback( - on_send_success - ).add_errback(on_send_error) - except Exception as err: - logging.exception(f"Failed to get parser module: {err}") - # TODO: send data to spare topic for future reprocessing? + [unpacked_data, device_data, parser_module_name] = process_kafka_raw_topic(msg.value) + print(f"printing unpacked data {unpacked_data}") + if parser_module_name: + try: + parser_module = importlib.import_module(parser_module_name) + except ModuleNotFoundError as err: + logging.warning(f"Importing parser module failed: {err}") + # TODO: store data for future re-processing + continue + try: + packet_timestamp, datalines = parser_module.create_datalines_from_raw_unpacked_data(unpacked_data) + parsed_data = create_parsed_data_message(packet_timestamp, datalines, device_data) + logging.debug(pformat(parsed_data)) + packed_data = data_pack(parsed_data) + producer.send(parsed_data_topic, packed_data).add_callback(on_send_success).add_errback(on_send_error) + except Exception as err: + logging.exception(f"Failed to get parser module: {err}") + # TODO: send data to spare topic for future reprocessing? if __name__ == "__main__": diff --git a/docker-compose.yml b/docker-compose.yml index 0a3b1c9..16cbd79 100644 --- a/docker-compose.yml +++ b/docker-compose.yml @@ -3,13 +3,14 @@ version: '3.8' services: - parser-digita: - image: ghcr.io/city-of-helsinki/mittaridatapumppu-parser + parser: + #image: ghcr.io/city-of-helsinki/mittaridatapumppu-parser build: . - depends_on: - kafka: - condition: service_healthy - restart: unless-stopped + #command: ["python", "./watcher.py"] + # depends_on: + # kafka: + # condition: service_healthy + #restart: unless-stopped environment: KAFKA_HOST: "kafka" KAFKA_PORT: 9092 @@ -17,34 +18,46 @@ services: KAFKA_GROUP_ID: "digita_dev" KAFKA_PARSED_DATA_TOPIC_NAME: "digita.parseddata" KAFKA_RAW_DATA_TOPIC_NAME: "digita.rawdata" + DEVREG_ENDPOINTS_URL: "http://devreg:8000/api/v1/hosts/localhost/" DEVICE_REGISTRY_URL: "http://devreg:8000/api/v1" - DEVICE_REGISTRY_TOKEN: b48455759b691baf3b811ba437ce9e581fc0a37e + DEVICE_REGISTRY_TOKEN: abcdef1234567890abcdef1234567890abcdef12 LOG_LEVEL: "DEBUG" DEBUG: 1 DEV_SERVER: 1 - kafka: - image: bitnami/kafka:3.4 - ports: - - "9092:9092" volumes: - - "kafka_data:/bitnami" - environment: - KAFKA_CFG_AUTO_CREATE_TOPICS_ENABLE: "true" - # Kafka KRaft settings - KAFKA_CFG_NODE_ID: 0 - KAFKA_CFG_PROCESS_ROLES: "controller,broker" - KAFKA_CFG_CONTROLLER_QUORUM_VOTERS: "0@kafka:9093" - # Listeners - KAFKA_CFG_LISTENERS: "PLAINTEXT://:9092,CONTROLLER://:9093" - KAFKA_CFG_ADVERTISED_LISTENERS: "PLAINTEXT://:9092" - KAFKA_CFG_LISTENER_SECURITY_PROTOCOL_MAP: "CONTROLLER:PLAINTEXT,PLAINTEXT:PLAINTEXT" - KAFKA_CFG_CONTROLLER_LISTENER_NAMES: "CONTROLLER" - KAFKA_CFG_INTER_BROKER_LISTENER_NAME: "PLAINTEXT" - healthcheck: - interval: 10s - retries: 3 - test: kafka-topics.sh --bootstrap-server kafka:9092 --topic hc --create --if-not-exists && kafka-topics.sh --bootstrap-server kafka:9092 --topic hc --describe - timeout: 5s + - .:/home/app + - ../fvhiot-python/fvhiot:/home/app/fvhiot + networks: + - mittaridatapumppu-dev_dkrnw + +# kafka: +# image: bitnami/kafka:3.4 +# ports: +# - "9092:9092" +# volumes: +# - "kafka_data:/bitnami" +# environment: +# KAFKA_CFG_AUTO_CREATE_TOPICS_ENABLE: "true" +# # Kafka KRaft settings +# KAFKA_CFG_NODE_ID: 0 +# KAFKA_CFG_PROCESS_ROLES: "controller,broker" +# KAFKA_CFG_CONTROLLER_QUORUM_VOTERS: "0@kafka:9093" +# # Listeners +# KAFKA_CFG_LISTENERS: "PLAINTEXT://:9092,CONTROLLER://:9093" +# KAFKA_CFG_ADVERTISED_LISTENERS: "PLAINTEXT://:9092" +# KAFKA_CFG_LISTENER_SECURITY_PROTOCOL_MAP: "CONTROLLER:PLAINTEXT,PLAINTEXT:PLAINTEXT" +# KAFKA_CFG_CONTROLLER_LISTENER_NAMES: "CONTROLLER" +# KAFKA_CFG_INTER_BROKER_LISTENER_NAME: "PLAINTEXT" +# healthcheck: +# interval: 10s +# retries: 3 +# test: kafka-topics.sh --bootstrap-server kafka:9092 --topic hc --create --if-not-exists && kafka-topics.sh --bootstrap-server kafka:9092 --topic hc --describe +# timeout: 5s + +# volumes: +# kafka_data: + -volumes: - kafka_data: +networks: + mittaridatapumppu-dev_dkrnw: + external: true