From 0b7425ca3ae6073776271602f6bfce3387183047 Mon Sep 17 00:00:00 2001 From: Sheena <11192150+sheenacodes@users.noreply.github.com> Date: Wed, 11 Oct 2023 11:09:49 +0300 Subject: [PATCH 1/7] removing device specific parsing from app (they go to device specific parser modules) --- app.py | 159 ++++++++++++++++----------------------------- docker-compose.yml | 75 ++++++++++++--------- 2 files changed, 101 insertions(+), 133 deletions(-) diff --git a/app.py b/app.py index 3654ccb..2553368 100644 --- a/app.py +++ b/app.py @@ -1,6 +1,5 @@ import datetime import importlib -import json import logging import os import pathlib @@ -9,9 +8,8 @@ import httpx from kafka.producer.future import RecordMetadata -from pydantic.error_wrappers import ValidationError -from fvhiot.models.thingpark import DevEuiUplink + from fvhiot.utils import init_script from fvhiot.utils.data import data_unpack, data_pack from fvhiot.utils.kafka import get_kafka_producer_by_envs, get_kafka_consumer_by_envs @@ -26,9 +24,7 @@ def backup_messages(raw_data_topic: str, msg): """ backupdir = pathlib.Path("messages") / pathlib.Path(raw_data_topic) backupdir.mkdir(parents=True, exist_ok=True) - backupfile = backupdir / pathlib.Path( - "{}.msgpack".format(datetime.datetime.utcnow().strftime("%Y-%m-%d")) - ) + backupfile = backupdir / pathlib.Path("{}.msgpack".format(datetime.datetime.utcnow().strftime("%Y-%m-%d"))) try: with open(backupfile, "ab") as outfile: outfile.write(msg.value) # msg.value is msgpack.packb()'ed data @@ -51,13 +47,6 @@ def on_send_error(excp): logging.error("Error on Kafka producer", exc_info=excp) -# TODO: digita related -def get_uplink_obj(data: dict) -> DevEuiUplink: - body_data = json.loads(data["request"]["body"].decode()) - uplink_obj = DevEuiUplink(**body_data["DevEUI_uplink"]) - return uplink_obj - - def get_device_data_devreg(device_id: str) -> dict: """ Get device metadata from device registry @@ -70,8 +59,7 @@ def get_device_data_devreg(device_id: str) -> dict: devreg_token = os.getenv("DEVICE_REGISTRY_TOKEN") if devreg_url is None or devreg_token is None: logging.error( - "DEVICE_REGISTRY_URL and DEVICE_REGISTRY_TOKEN must be defined, " - "querying device metadata failed" + "DEVICE_REGISTRY_URL and DEVICE_REGISTRY_TOKEN must be defined, " "querying device metadata failed" ) return metadata if device_id is None: @@ -94,9 +82,7 @@ def get_device_data_devreg(device_id: str) -> dict: metadata = response.json() logging.debug(metadata) else: - logging.warning( - f"Device registry returned {response.status_code} {response.text}" - ) + logging.warning(f"Device registry returned {response.status_code} {response.text}") except httpx.HTTPError as err: logging.exception(f"{err}") @@ -114,25 +100,12 @@ def get_device_data(device_id: str) -> dict: if os.getenv("DEVICE_REGISTRY_URL"): return get_device_data_devreg(device_id) else: - logging.error( - "DEVICE_REGISTRY_URL must be defined, querying device metadata failed" - ) + logging.error("DEVICE_REGISTRY_URL must be defined, querying device metadata failed") return metadata -# TODO: digita related -def parse_payload( - parser_module, payload_hex: str, port: int, timestamp: datetime.datetime -): - # TODO: implement new function in parser_module, which accepts timestamp - val = parser_module.decode_hex(payload_hex, port=port) - return val - - # TODO: generic -def create_parsed_data_message( - timestamp: datetime.datetime, payload: list, device: dict -) -> dict: +def create_parsed_data_message(timestamp: datetime.datetime, payload: list, device: dict) -> dict: """ Mega function to create parsed data messages. Data structure loosely follows JTS (Json time series) format. @@ -145,9 +118,7 @@ def create_parsed_data_message( time_str = timestamp.isoformat() parsed_data["meta"] = { "timestamp_received": "{}".format(time_str), - "timestamp_parsed": "{}".format( - datetime.datetime.now(tz=ZoneInfo("UTC")).isoformat() - ), + "timestamp_parsed": "{}".format(datetime.datetime.now(tz=ZoneInfo("UTC")).isoformat()), } parsed_data["device"] = device # header varibles @@ -162,9 +133,7 @@ def create_parsed_data_message( for d in item["data"].keys(): keys.add(d) keys = sorted(list(keys)) # now we have all unique keys in sorted list - col_map = ( - {} - ) # create mapping for silly "0", "1", "2" named columns and real data keys + col_map = {} # create mapping for silly "0", "1", "2" named columns and real data keys for k in keys: col_name = str(col_cnt) # "0", "1", "2" and so on columns[col_name] = {"name": k} # e.g. columns["0] = {"name" : "temp"} @@ -175,9 +144,7 @@ def create_parsed_data_message( "time": item["time"], "f": {}, } # take "time" as is, we trust that it is a valid ISO date str - for k, v in sorted( - item["data"].items() - ): # put keys into "f" in sorted order (same as in header) + for k, v in sorted(item["data"].items()): # put keys into "f" in sorted order (same as in header) col_name = col_map[k] data_item["f"][col_name] = {"v": v} parsed_data["data"].append(data_item) @@ -195,15 +162,40 @@ def create_parsed_data_message( return parsed_data +def process_kafka_raw_topic(raw_data: bytes): + # TODO doc + unpacked_data = data_unpack(raw_data) + logging.info(pformat(unpacked_data)) + device_id = unpacked_data["device_id"] + if device_id is None: + logging.warning("Device id not found in raw data - unpacked_data['device_id'] ") + # TODO: store data for future re-processing + return unpacked_data, None, None + device_data = get_device_data(device_id) + # if device_data is None or "device_metadata" not in device_data: + if device_data is None: + logging.warning(f"Device data not found for device_id: {device_id}") + # TODO: store data for future re-processing + return unpacked_data, None, None + + parser_module_name = device_data.get("parser_module", "") + if parser_module_name == "": + logging.warning("Parser module name not found") + # TODO: store data for future re-processing + return unpacked_data, device_data, None + + print(device_data) + print(f"printing parser module {parser_module_name}") + return unpacked_data, device_data, parser_module_name + + def main(): init_script() - raw_data_topic = os.getenv("KAFKA_RAW_DATA_TOPIC_NAME") + raw_data_topic = "cesva.rawdata" # os.getenv("KAFKA_RAW_DATA_TOPIC_NAME") parsed_data_topic = os.getenv("KAFKA_PARSED_DATA_TOPIC_NAME") - logging.info( - f"Get Kafka consumer for {raw_data_topic} and producer for {parsed_data_topic}" - ) + logging.info(f"Get Kafka consumer for {raw_data_topic} and producer for {parsed_data_topic}") # Create Kafka consumer for incoming raw data messages - consumer = get_kafka_consumer_by_envs(raw_data_topic) + consumer = get_kafka_consumer_by_envs(raw_data_topic) # listen to multiple topics -> ? producer = get_kafka_producer_by_envs() if consumer is None or producer is None: logging.critical("Kafka connection failed, exiting.") @@ -213,62 +205,25 @@ def main(): logging.info("Parser is waiting for raw data messages from Kafka.") for msg in consumer: logging.info("Preparing to parse payload") - data = data_unpack(msg.value) - logging.info(pformat(data)) - # if os.getenv("DEBUG"): - # backup_messages(raw_data_topic, msg) - data["raw_data_topic"] = raw_data_topic # TODO: add this value in endpoint - try: - uplink_obj = get_uplink_obj(data) # TODO: handle errors here - except KeyError as err: - logging.warning(f"KeyError '{err}', message has no DevEUI_uplink key?") - continue - except ValidationError as err: - logging.warning(f"ValidationError '{err}'") - continue - device_data = get_device_data(uplink_obj.DevEUI) - # if device_data is None or "device_metadata" not in device_data: - if device_data is None: - logging.warning(f"Device data for {uplink_obj.DevEUI} not found.") - # TODO: store data for future re-processing - continue - # print(type(device_data), device_data) - # TODO: use better datetime parser? - packet_timestamp = datetime.datetime.strptime( - uplink_obj.Time, "%Y-%m-%dT%H:%M:%S.%f%z" - ) - parser_module_name = device_data.get("parser_module", "") - if parser_module_name == "": - logging.warning("Parser module name not found") - # TODO: store data for future re-processing - continue - logging.info( - f"Trying to parse hex payload {uplink_obj.payload_hex} with " - f"{parser_module_name}" - ) - try: - parser_module = importlib.import_module(parser_module_name) - except ModuleNotFoundError as err: - logging.warning(f"Importing parser module failed: {err}") - # TODO: store data for future re-processing - continue - try: - print(uplink_obj.payload_hex, uplink_obj.FPort, uplink_obj.Time) - datalines = parser_module.create_datalines( - uplink_obj.payload_hex, port=uplink_obj.FPort, time_str=uplink_obj.Time - ) - parsed_data = create_parsed_data_message( - packet_timestamp, datalines, device_data - ) - logging.debug(pformat(parsed_data)) - packed_data = data_pack(parsed_data) - producer.send(parsed_data_topic, packed_data).add_callback( - on_send_success - ).add_errback(on_send_error) - except Exception as err: - logging.exception(f"Failed to get parser module: {err}") - # TODO: send data to spare topic for future reprocessing? + [unpacked_data, device_data, parser_module_name] = process_kafka_raw_topic(msg.value) + print(f"printing unpacked data {unpacked_data}") + if parser_module_name: + try: + parser_module = importlib.import_module(parser_module_name) + except ModuleNotFoundError as err: + logging.warning(f"Importing parser module failed: {err}") + # TODO: store data for future re-processing + continue + try: + packet_timestamp, datalines = parser_module.create_datalines_from_raw_unpacked_data(unpacked_data) + parsed_data = create_parsed_data_message(packet_timestamp, datalines, device_data) + logging.debug(pformat(parsed_data)) + packed_data = data_pack(parsed_data) + producer.send(parsed_data_topic, packed_data).add_callback(on_send_success).add_errback(on_send_error) + except Exception as err: + logging.exception(f"Failed to get parser module: {err}") + # TODO: send data to spare topic for future reprocessing? if __name__ == "__main__": diff --git a/docker-compose.yml b/docker-compose.yml index 0a3b1c9..16cbd79 100644 --- a/docker-compose.yml +++ b/docker-compose.yml @@ -3,13 +3,14 @@ version: '3.8' services: - parser-digita: - image: ghcr.io/city-of-helsinki/mittaridatapumppu-parser + parser: + #image: ghcr.io/city-of-helsinki/mittaridatapumppu-parser build: . - depends_on: - kafka: - condition: service_healthy - restart: unless-stopped + #command: ["python", "./watcher.py"] + # depends_on: + # kafka: + # condition: service_healthy + #restart: unless-stopped environment: KAFKA_HOST: "kafka" KAFKA_PORT: 9092 @@ -17,34 +18,46 @@ services: KAFKA_GROUP_ID: "digita_dev" KAFKA_PARSED_DATA_TOPIC_NAME: "digita.parseddata" KAFKA_RAW_DATA_TOPIC_NAME: "digita.rawdata" + DEVREG_ENDPOINTS_URL: "http://devreg:8000/api/v1/hosts/localhost/" DEVICE_REGISTRY_URL: "http://devreg:8000/api/v1" - DEVICE_REGISTRY_TOKEN: b48455759b691baf3b811ba437ce9e581fc0a37e + DEVICE_REGISTRY_TOKEN: abcdef1234567890abcdef1234567890abcdef12 LOG_LEVEL: "DEBUG" DEBUG: 1 DEV_SERVER: 1 - kafka: - image: bitnami/kafka:3.4 - ports: - - "9092:9092" volumes: - - "kafka_data:/bitnami" - environment: - KAFKA_CFG_AUTO_CREATE_TOPICS_ENABLE: "true" - # Kafka KRaft settings - KAFKA_CFG_NODE_ID: 0 - KAFKA_CFG_PROCESS_ROLES: "controller,broker" - KAFKA_CFG_CONTROLLER_QUORUM_VOTERS: "0@kafka:9093" - # Listeners - KAFKA_CFG_LISTENERS: "PLAINTEXT://:9092,CONTROLLER://:9093" - KAFKA_CFG_ADVERTISED_LISTENERS: "PLAINTEXT://:9092" - KAFKA_CFG_LISTENER_SECURITY_PROTOCOL_MAP: "CONTROLLER:PLAINTEXT,PLAINTEXT:PLAINTEXT" - KAFKA_CFG_CONTROLLER_LISTENER_NAMES: "CONTROLLER" - KAFKA_CFG_INTER_BROKER_LISTENER_NAME: "PLAINTEXT" - healthcheck: - interval: 10s - retries: 3 - test: kafka-topics.sh --bootstrap-server kafka:9092 --topic hc --create --if-not-exists && kafka-topics.sh --bootstrap-server kafka:9092 --topic hc --describe - timeout: 5s + - .:/home/app + - ../fvhiot-python/fvhiot:/home/app/fvhiot + networks: + - mittaridatapumppu-dev_dkrnw + +# kafka: +# image: bitnami/kafka:3.4 +# ports: +# - "9092:9092" +# volumes: +# - "kafka_data:/bitnami" +# environment: +# KAFKA_CFG_AUTO_CREATE_TOPICS_ENABLE: "true" +# # Kafka KRaft settings +# KAFKA_CFG_NODE_ID: 0 +# KAFKA_CFG_PROCESS_ROLES: "controller,broker" +# KAFKA_CFG_CONTROLLER_QUORUM_VOTERS: "0@kafka:9093" +# # Listeners +# KAFKA_CFG_LISTENERS: "PLAINTEXT://:9092,CONTROLLER://:9093" +# KAFKA_CFG_ADVERTISED_LISTENERS: "PLAINTEXT://:9092" +# KAFKA_CFG_LISTENER_SECURITY_PROTOCOL_MAP: "CONTROLLER:PLAINTEXT,PLAINTEXT:PLAINTEXT" +# KAFKA_CFG_CONTROLLER_LISTENER_NAMES: "CONTROLLER" +# KAFKA_CFG_INTER_BROKER_LISTENER_NAME: "PLAINTEXT" +# healthcheck: +# interval: 10s +# retries: 3 +# test: kafka-topics.sh --bootstrap-server kafka:9092 --topic hc --create --if-not-exists && kafka-topics.sh --bootstrap-server kafka:9092 --topic hc --describe +# timeout: 5s + +# volumes: +# kafka_data: + -volumes: - kafka_data: +networks: + mittaridatapumppu-dev_dkrnw: + external: true From 40b01d729a42da56643a7441296987de82ad8e6c Mon Sep 17 00:00:00 2001 From: Sheena <11192150+sheenacodes@users.noreply.github.com> Date: Wed, 11 Oct 2023 11:17:16 +0300 Subject: [PATCH 2/7] remove hardcoded topic name --- app.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/app.py b/app.py index 2553368..6675fec 100644 --- a/app.py +++ b/app.py @@ -191,7 +191,7 @@ def process_kafka_raw_topic(raw_data: bytes): def main(): init_script() - raw_data_topic = "cesva.rawdata" # os.getenv("KAFKA_RAW_DATA_TOPIC_NAME") + raw_data_topic = os.getenv("KAFKA_RAW_DATA_TOPIC_NAME") parsed_data_topic = os.getenv("KAFKA_PARSED_DATA_TOPIC_NAME") logging.info(f"Get Kafka consumer for {raw_data_topic} and producer for {parsed_data_topic}") # Create Kafka consumer for incoming raw data messages From d3fef73ffe9d64c8d9e8fe02bcf9a7ec2ad583a0 Mon Sep 17 00:00:00 2001 From: Sheena <11192150+sheenacodes@users.noreply.github.com> Date: Thu, 12 Oct 2023 15:35:03 +0300 Subject: [PATCH 3/7] create async tasks for multiple data streams processing --- app.py | 220 ++++++++++++++++++++++++++++++++++++++++----------------- 1 file changed, 154 insertions(+), 66 deletions(-) diff --git a/app.py b/app.py index 6675fec..4bd856c 100644 --- a/app.py +++ b/app.py @@ -5,14 +5,28 @@ import pathlib from pprint import pformat from zoneinfo import ZoneInfo - +import asyncio import httpx -from kafka.producer.future import RecordMetadata - +from kafka.errors import UnknownTopicOrPartitionError from fvhiot.utils import init_script from fvhiot.utils.data import data_unpack, data_pack -from fvhiot.utils.kafka import get_kafka_producer_by_envs, get_kafka_consumer_by_envs +from fvhiot.utils.aiokafka import ( + get_aiokafka_producer_by_envs, + get_aiokafka_consumer_by_envs, + on_send_success, + on_send_error, +) + +# TODO: for testing, add better defaults (or remove completely to make sure it is set in env) +DEVREG_ENDPOINTS_URL = os.getenv("DEVREG_ENDPOINTS_URL", "http://127.0.0.1:8000/api/v1/hosts/localhost/") +DEVREG_API_TOKEN = os.getenv("DEVREG_API_TOKEN", "abcdef1234567890abcdef1234567890abcdef12") + +device_registry_request_headers = { + "Authorization": f"Token {DEVREG_API_TOKEN}", + "User-Agent": "mittaridatapumppu-endpoint/0.1.0", + "Accept": "application/json", +} def backup_messages(raw_data_topic: str, msg): @@ -33,21 +47,7 @@ def backup_messages(raw_data_topic: str, msg): logging.error(msg) -# TODO: to kafka module -def on_send_success(record_metadata: RecordMetadata): - logging.info( - "Successfully sent to topic {}, partition {}, offset {}".format( - record_metadata.topic, record_metadata.partition, record_metadata.offset - ) - ) - - -# TODO: to kafka module -def on_send_error(excp): - logging.error("Error on Kafka producer", exc_info=excp) - - -def get_device_data_devreg(device_id: str) -> dict: +async def get_device_data_devreg(device_id: str) -> dict: """ Get device metadata from device registry @@ -75,21 +75,63 @@ def get_device_data_devreg(device_id: str) -> dict: "Authorization": f"Token {devreg_token}", "User-Agent": "mittaridatapumppu-parser/0.0.1", } - - try: - response = httpx.get(url, headers=headers) - if response.status_code == 200: - metadata = response.json() - logging.debug(metadata) - else: - logging.warning(f"Device registry returned {response.status_code} {response.text}") - except httpx.HTTPError as err: - logging.exception(f"{err}") + async with httpx.AsyncClient() as client: + try: + response = await client.get(url, headers=headers) + if response.status_code == 200: + metadata = response.json() + logging.debug(metadata) + else: + logging.warning(f"Device registry returned {response.status_code} {response.text}") + except httpx.HTTPError as err: + logging.exception(f"{err}") return metadata -def get_device_data(device_id: str) -> dict: +async def get_kafka_topics_from_device_registry_endpoints(fail_on_error: bool) -> dict: + """ + Update kafka topics information for endpoints from device registry. + This is done on startup and when device registry is updated. + """ + # Create request to ENDPOINTS_URL and get data using httpx + async with httpx.AsyncClient() as client: + try: + response = await client.get(DEVREG_ENDPOINTS_URL, headers=device_registry_request_headers) + if response.status_code == 200: + data = response.json() + logging.info(f"Got {len(data['endpoints'])} endpoints from device registry {DEVREG_ENDPOINTS_URL}") + endpoint_topic_mappings = {} + for endpoint in data["endpoints"]: + try: + endpoint_path = endpoint["endpoint_path"] + raw_topic = endpoint["kafka_raw_data_topic"] + parsed_topic = endpoint["kafka_parsed_data_topic"] + group_id = endpoint["kafka_group_id"] + + endpoint_topic_mappings[endpoint_path] = { + "raw_topic": raw_topic, + "parsed_topic": parsed_topic, + "group_id": group_id, + "endpoint_path": endpoint_path, + } + + except KeyError as e: + logging.error(f"ATTENTION: endpoint data missing kafka topic information. {e} in {endpoint}") + + print(f"REMOVE ME {endpoint_topic_mappings}") + if len(endpoint_topic_mappings) >= 1: + return endpoint_topic_mappings + + except Exception as e: + logging.error(f"Failed to get endpoints from device registry {DEVREG_ENDPOINTS_URL}: {e}") + if fail_on_error: + raise e + + return None + + +async def get_device_data(device_id: str) -> dict: """ Get device metadata from device registry @@ -98,7 +140,7 @@ def get_device_data(device_id: str) -> dict: """ metadata = {} if os.getenv("DEVICE_REGISTRY_URL"): - return get_device_data_devreg(device_id) + return await get_device_data_devreg(device_id) else: logging.error("DEVICE_REGISTRY_URL must be defined, querying device metadata failed") return metadata @@ -162,16 +204,16 @@ def create_parsed_data_message(timestamp: datetime.datetime, payload: list, devi return parsed_data -def process_kafka_raw_topic(raw_data: bytes): +async def process_kafka_raw_topic(raw_data: bytes): # TODO doc unpacked_data = data_unpack(raw_data) logging.info(pformat(unpacked_data)) - device_id = unpacked_data["device_id"] + device_id = unpacked_data.get("device_id") if device_id is None: logging.warning("Device id not found in raw data - unpacked_data['device_id'] ") # TODO: store data for future re-processing return unpacked_data, None, None - device_data = get_device_data(device_id) + device_data = await get_device_data(device_id) # if device_data is None or "device_metadata" not in device_data: if device_data is None: logging.warning(f"Device data not found for device_id: {device_id}") @@ -189,45 +231,91 @@ def process_kafka_raw_topic(raw_data: bytes): return unpacked_data, device_data, parser_module_name -def main(): - init_script() - raw_data_topic = os.getenv("KAFKA_RAW_DATA_TOPIC_NAME") - parsed_data_topic = os.getenv("KAFKA_PARSED_DATA_TOPIC_NAME") - logging.info(f"Get Kafka consumer for {raw_data_topic} and producer for {parsed_data_topic}") +# TODO group id variable +async def consume_and_parse_data_stream(raw_data_topic, parsed_data_topic, producer): + logging.info(f"Get Kafka consumer for {raw_data_topic}") # Create Kafka consumer for incoming raw data messages - consumer = get_kafka_consumer_by_envs(raw_data_topic) # listen to multiple topics -> ? - producer = get_kafka_producer_by_envs() - if consumer is None or producer is None: + # TODO make group id a an argument to be passed to get_kafka_consumer_by_envs + consumer = None + try: + consumer = await get_aiokafka_consumer_by_envs([raw_data_topic]) + except UnknownTopicOrPartitionError as err: + logging.error(f"Kafka Error. topic: {raw_data_topic} does not exist. error: {err}") + return + + if consumer is None: logging.critical("Kafka connection failed, exiting.") - exit(1) + return # Loop forever for incoming messages - logging.info("Parser is waiting for raw data messages from Kafka.") - for msg in consumer: - logging.info("Preparing to parse payload") - - [unpacked_data, device_data, parser_module_name] = process_kafka_raw_topic(msg.value) - print(f"printing unpacked data {unpacked_data}") - if parser_module_name: - try: - parser_module = importlib.import_module(parser_module_name) - except ModuleNotFoundError as err: - logging.warning(f"Importing parser module failed: {err}") - # TODO: store data for future re-processing - continue - try: - packet_timestamp, datalines = parser_module.create_datalines_from_raw_unpacked_data(unpacked_data) - parsed_data = create_parsed_data_message(packet_timestamp, datalines, device_data) - logging.debug(pformat(parsed_data)) - packed_data = data_pack(parsed_data) - producer.send(parsed_data_topic, packed_data).add_callback(on_send_success).add_errback(on_send_error) - except Exception as err: - logging.exception(f"Failed to get parser module: {err}") - # TODO: send data to spare topic for future reprocessing? + logging.info(f"Parser is waiting for raw data messages from Kafka topic {raw_data_topic}") + + try: + async for msg in consumer: + logging.info("Preparing to parse payload") + + [unpacked_data, device_data, parser_module_name] = await process_kafka_raw_topic(msg.value) + print(f"printing unpacked data {unpacked_data}") + if parser_module_name: + try: + parser_module = importlib.import_module(parser_module_name) + except ModuleNotFoundError as err: + logging.warning(f"Importing parser module failed: {err}") + # TODO: store data for future re-processing + continue + try: + packet_timestamp, datalines = parser_module.create_datalines_from_raw_unpacked_data(unpacked_data) + parsed_data = create_parsed_data_message(packet_timestamp, datalines, device_data) + logging.debug(pformat(parsed_data)) + packed_data = data_pack(parsed_data) + logging.info(f"Sending parsed data to Kafka topic {parsed_data_topic}") + future = await producer.send(parsed_data_topic, packed_data) + # .add_callback(on_send_success).add_errback( on_send_error) + + # Attach callbacks to the future + # add_done_callback takes a function as an argument. a lambda is an anonymous function + future.add_done_callback(lambda fut: asyncio.ensure_future(on_send_success(fut.result()))) + future.add_done_callback(lambda fut: fut.add_errback(on_send_error)) + + except Exception as err: + logging.exception(f"parse failed : {err}") + # TODO: send data to spare topic for future reprocessing? + + finally: + await consumer.stop() + await producer.stop() + + +async def main(): + init_script() + + logging.info("Get producer for pushing parsed data messages to Kafka.") + producer = await get_aiokafka_producer_by_envs() + if producer is None: + logging.critical("Kafka connection failed, exiting.") + exit(1) + + tasks = [] + endpoint_topic_mappings = await get_kafka_topics_from_device_registry_endpoints(True) + endpoint_topic_mappings.pop("/api/v1/data") + endpoints = endpoint_topic_mappings.keys() + for endpoint in endpoints: + logging.info(f"Setting up parser for path: {endpoint}") + + e2t_map = endpoint_topic_mappings[endpoint] + print(e2t_map) + raw_data_topic = e2t_map["raw_topic"] + parsed_data_topic = e2t_map["parsed_topic"] + tasks.append(consume_and_parse_data_stream(raw_data_topic, parsed_data_topic, producer)) + + try: + await asyncio.gather(*tasks) + except KeyboardInterrupt: + print("Bye!") if __name__ == "__main__": try: - main() + asyncio.run(main()) except KeyboardInterrupt: print("Bye!") From 01290f8189fb15d44f620579e09226a6cd36c063 Mon Sep 17 00:00:00 2001 From: Sheena <11192150+sheenacodes@users.noreply.github.com> Date: Fri, 13 Oct 2023 09:47:36 +0300 Subject: [PATCH 4/7] refactor code --- app.py | 95 +++++++++++++++++++++++++++++++++++----------------------- 1 file changed, 58 insertions(+), 37 deletions(-) diff --git a/app.py b/app.py index 4bd856c..d0653bf 100644 --- a/app.py +++ b/app.py @@ -231,59 +231,80 @@ async def process_kafka_raw_topic(raw_data: bytes): return unpacked_data, device_data, parser_module_name -# TODO group id variable -async def consume_and_parse_data_stream(raw_data_topic, parsed_data_topic, producer): - logging.info(f"Get Kafka consumer for {raw_data_topic}") - # Create Kafka consumer for incoming raw data messages - # TODO make group id a an argument to be passed to get_kafka_consumer_by_envs - consumer = None +async def initialize_kafka_consumer(raw_data_topic): + """ + Initialize Kafka consumer for raw data topic + :param raw_data_topic: Raw data topic's name + :return: AIOKafkaConsumer + """ try: - consumer = await get_aiokafka_consumer_by_envs([raw_data_topic]) + # TODO make group id a an argument to be passed to get_kafka_consumer_by_envs ? + return await get_aiokafka_consumer_by_envs([raw_data_topic]) except UnknownTopicOrPartitionError as err: logging.error(f"Kafka Error. topic: {raw_data_topic} does not exist. error: {err}") - return + raise err - if consumer is None: - logging.critical("Kafka connection failed, exiting.") + except Exception as err: + logging.exception(f"Kafka Error. topic: {raw_data_topic} error: {err}") + raise err + + +async def produce_parsed_data_message(parsed_data_topic, producer, packed_data): + try: + future = await producer.send(parsed_data_topic, packed_data) + + # Attach callbacks to the future + # add_done_callback takes a function as an argument. a lambda is an anonymous function + future.add_done_callback(lambda fut: asyncio.ensure_future(on_send_success(fut.result()))) + future.add_done_callback(lambda fut: fut.add_errback(on_send_error)) + except Exception as e: + logging.error(f"Failed to send parsed data to Kafka topic {parsed_data_topic}: {e}") + raise e + + +async def parse_data(unpacked_data, device_data, parser_module_name): + logging.info("Preparing to parse payload") + try: + parser_module = importlib.import_module(parser_module_name) + packet_timestamp, datalines = parser_module.create_datalines_from_raw_unpacked_data(unpacked_data) + parsed_data = create_parsed_data_message(packet_timestamp, datalines, device_data) + logging.debug(pformat(parsed_data)) + packed_data = data_pack(parsed_data) + return packed_data + + except ModuleNotFoundError as err: + logging.critical(f"Importing parser module {parser_module_name} failed: {err}") + return + # TODO: store data for future re-processing + except Exception as err: + logging.exception(f"parsing failed at {parser_module_name} : {err}") + # TODO: send data to spare topic for future reprocessing? return - # Loop forever for incoming messages - logging.info(f"Parser is waiting for raw data messages from Kafka topic {raw_data_topic}") +# TODO group id variable +async def consume_and_parse_data_stream(raw_data_topic, parsed_data_topic, producer): try: + logging.info(f"Get Kafka consumer for {raw_data_topic}") + consumer = await initialize_kafka_consumer(raw_data_topic) + + logging.info(f"Parser is waiting for raw data messages from Kafka topic {raw_data_topic}") + # Loop forever for incoming messages async for msg in consumer: logging.info("Preparing to parse payload") [unpacked_data, device_data, parser_module_name] = await process_kafka_raw_topic(msg.value) print(f"printing unpacked data {unpacked_data}") if parser_module_name: - try: - parser_module = importlib.import_module(parser_module_name) - except ModuleNotFoundError as err: - logging.warning(f"Importing parser module failed: {err}") - # TODO: store data for future re-processing - continue - try: - packet_timestamp, datalines = parser_module.create_datalines_from_raw_unpacked_data(unpacked_data) - parsed_data = create_parsed_data_message(packet_timestamp, datalines, device_data) - logging.debug(pformat(parsed_data)) - packed_data = data_pack(parsed_data) - logging.info(f"Sending parsed data to Kafka topic {parsed_data_topic}") - future = await producer.send(parsed_data_topic, packed_data) - # .add_callback(on_send_success).add_errback( on_send_error) - - # Attach callbacks to the future - # add_done_callback takes a function as an argument. a lambda is an anonymous function - future.add_done_callback(lambda fut: asyncio.ensure_future(on_send_success(fut.result()))) - future.add_done_callback(lambda fut: fut.add_errback(on_send_error)) - - except Exception as err: - logging.exception(f"parse failed : {err}") - # TODO: send data to spare topic for future reprocessing? + packed_data = await parse_data(unpacked_data, device_data, parser_module_name) + if packed_data: + await produce_parsed_data_message(parsed_data_topic, producer, packed_data) finally: - await consumer.stop() - await producer.stop() + if consumer: + await consumer.stop() + if producer: + await producer.stop() async def main(): From 0b0179fb1bd61eb2d778c3bb2fb1694ce12bd147 Mon Sep 17 00:00:00 2001 From: Sheena <11192150+sheenacodes@users.noreply.github.com> Date: Fri, 13 Oct 2023 09:53:31 +0300 Subject: [PATCH 5/7] add missing docstring --- app.py | 76 +++++++++++++++++++++++++++++++++++++++++----------------- 1 file changed, 54 insertions(+), 22 deletions(-) diff --git a/app.py b/app.py index d0653bf..1e4ea67 100644 --- a/app.py +++ b/app.py @@ -205,30 +205,41 @@ def create_parsed_data_message(timestamp: datetime.datetime, payload: list, devi async def process_kafka_raw_topic(raw_data: bytes): - # TODO doc - unpacked_data = data_unpack(raw_data) - logging.info(pformat(unpacked_data)) - device_id = unpacked_data.get("device_id") - if device_id is None: - logging.warning("Device id not found in raw data - unpacked_data['device_id'] ") - # TODO: store data for future re-processing - return unpacked_data, None, None - device_data = await get_device_data(device_id) - # if device_data is None or "device_metadata" not in device_data: - if device_data is None: - logging.warning(f"Device data not found for device_id: {device_id}") - # TODO: store data for future re-processing - return unpacked_data, None, None + """ + Process raw data received from Kafka. + :param raw_data: Raw data received from Kafka + :return: unpacked_data, device_data, parser_module_name + """ - parser_module_name = device_data.get("parser_module", "") - if parser_module_name == "": - logging.warning("Parser module name not found") - # TODO: store data for future re-processing - return unpacked_data, device_data, None + try: + unpacked_data = data_unpack(raw_data) + logging.info(pformat(unpacked_data)) + + device_id = unpacked_data.get("device_id") + if device_id is None: + logging.warning("Device id not found in raw data - unpacked_data['device_id'] ") + # TODO: store data for future re-processing + return unpacked_data, None, None + device_data = await get_device_data(device_id) + # if device_data is None or "device_metadata" not in device_data: + if device_data is None: + logging.warning(f"Device data not found for device_id: {device_id}") + # TODO: store data for future re-processing + return unpacked_data, None, None + + parser_module_name = device_data.get("parser_module", "") + if parser_module_name == "": + logging.warning("Parser module name not found") + # TODO: store data for future re-processing + return unpacked_data, device_data, None + + print(device_data) + print(f"printing parser module {parser_module_name}") + return unpacked_data, device_data, parser_module_name - print(device_data) - print(f"printing parser module {parser_module_name}") - return unpacked_data, device_data, parser_module_name + except Exception as err: + logging.exception(f"Failed to process raw data: {err}") + return async def initialize_kafka_consumer(raw_data_topic): @@ -250,6 +261,12 @@ async def initialize_kafka_consumer(raw_data_topic): async def produce_parsed_data_message(parsed_data_topic, producer, packed_data): + """ + Produce parsed data message to Kafka + :param parsed_data_topic: Parsed data topic's name + :param producer: AIOKafka producer + :param packed_data: Packed data to be sent to Kafka + """ try: future = await producer.send(parsed_data_topic, packed_data) @@ -263,6 +280,13 @@ async def produce_parsed_data_message(parsed_data_topic, producer, packed_data): async def parse_data(unpacked_data, device_data, parser_module_name): + """ + Parse data using parser module + :param unpacked_data: Unpacked data + :param device_data: Device data + :param parser_module_name: Parser module's name + :return: Packed data + """ logging.info("Preparing to parse payload") try: parser_module = importlib.import_module(parser_module_name) @@ -284,6 +308,12 @@ async def parse_data(unpacked_data, device_data, parser_module_name): # TODO group id variable async def consume_and_parse_data_stream(raw_data_topic, parsed_data_topic, producer): + """ + Consume raw data from Kafka and parse it + :param raw_data_topic: Raw data topic's name + :param parsed_data_topic: Parsed data topic's name + :param producer: AIOKafka producer + """ try: logging.info(f"Get Kafka consumer for {raw_data_topic}") consumer = await initialize_kafka_consumer(raw_data_topic) @@ -301,6 +331,8 @@ async def consume_and_parse_data_stream(raw_data_topic, parsed_data_topic, produ await produce_parsed_data_message(parsed_data_topic, producer, packed_data) finally: + logging.info(f"Stopped Listening to {raw_data_topic}") + logging.info("Closing Kafka consumer and producer.") if consumer: await consumer.stop() if producer: From 2e34e4244a33af0ff1853afbbae305714f17f316 Mon Sep 17 00:00:00 2001 From: Sheena <11192150+sheenacodes@users.noreply.github.com> Date: Wed, 18 Oct 2023 21:55:14 +0300 Subject: [PATCH 6/7] update project config --- app.py | 28 ++++++++++++---------------- pyproject.toml | 2 +- 2 files changed, 13 insertions(+), 17 deletions(-) diff --git a/app.py b/app.py index 1e4ea67..8deeffc 100644 --- a/app.py +++ b/app.py @@ -19,12 +19,13 @@ ) # TODO: for testing, add better defaults (or remove completely to make sure it is set in env) -DEVREG_ENDPOINTS_URL = os.getenv("DEVREG_ENDPOINTS_URL", "http://127.0.0.1:8000/api/v1/hosts/localhost/") -DEVREG_API_TOKEN = os.getenv("DEVREG_API_TOKEN", "abcdef1234567890abcdef1234567890abcdef12") +DEVICE_REGISTRY_ENDPOINTS_URL = os.getenv("DEVICE_REGISTRY_ENDPOINTS_URL", "http://127.0.0.1:8000/api/v1/hosts/localhost/") +DEVICE_REGISTRY_URL = os.getenv("DEVICE_REGISTRY_URL", "http://127.0.0.1:8000/api/v1/") +DEVICE_REGISTRY_API_TOKEN = os.getenv("DEVICE_REGISTRY_API_TOKEN", "abcdef1234567890abcdef1234567890abcdef12") device_registry_request_headers = { - "Authorization": f"Token {DEVREG_API_TOKEN}", - "User-Agent": "mittaridatapumppu-endpoint/0.1.0", + "Authorization": f"Token {DEVICE_REGISTRY_API_TOKEN}", + "User-Agent": "mittaridatapumppu-parser/0.1.0", "Accept": "application/json", } @@ -55,9 +56,7 @@ async def get_device_data_devreg(device_id: str) -> dict: :return: Device data in a dict """ metadata = {} - devreg_url = os.getenv("DEVICE_REGISTRY_URL") - devreg_token = os.getenv("DEVICE_REGISTRY_TOKEN") - if devreg_url is None or devreg_token is None: + if DEVICE_REGISTRY_URL is None or DEVICE_REGISTRY_API_TOKEN is None: logging.error( "DEVICE_REGISTRY_URL and DEVICE_REGISTRY_TOKEN must be defined, " "querying device metadata failed" ) @@ -68,16 +67,12 @@ async def get_device_data_devreg(device_id: str) -> dict: # NOTE: creating redis client is very cheap operation, but perhaps it # should be benchmarked? Another solution would be to create client once # (like kafka consumer) and re-using it in subsequent calls - url = f"{devreg_url}/devices/{device_id}/" + url = f"{DEVICE_REGISTRY_URL}/devices/{device_id}/" logging.info(f"Querying metadata from {url}") # Get metadata from device registry using httpx - headers = { - "Authorization": f"Token {devreg_token}", - "User-Agent": "mittaridatapumppu-parser/0.0.1", - } async with httpx.AsyncClient() as client: try: - response = await client.get(url, headers=headers) + response = await client.get(url, headers=device_registry_request_headers) if response.status_code == 200: metadata = response.json() logging.debug(metadata) @@ -97,10 +92,10 @@ async def get_kafka_topics_from_device_registry_endpoints(fail_on_error: bool) - # Create request to ENDPOINTS_URL and get data using httpx async with httpx.AsyncClient() as client: try: - response = await client.get(DEVREG_ENDPOINTS_URL, headers=device_registry_request_headers) + response = await client.get(DEVICE_REGISTRY_ENDPOINTS_URL, headers=device_registry_request_headers) if response.status_code == 200: data = response.json() - logging.info(f"Got {len(data['endpoints'])} endpoints from device registry {DEVREG_ENDPOINTS_URL}") + logging.info(f"Got {len(data['endpoints'])} endpoints from device registry {DEVICE_REGISTRY_ENDPOINTS_URL}") endpoint_topic_mappings = {} for endpoint in data["endpoints"]: try: @@ -124,7 +119,7 @@ async def get_kafka_topics_from_device_registry_endpoints(fail_on_error: bool) - return endpoint_topic_mappings except Exception as e: - logging.error(f"Failed to get endpoints from device registry {DEVREG_ENDPOINTS_URL}: {e}") + logging.error(f"Failed to get endpoints from device registry {DEVICE_REGISTRY_ENDPOINTS_URL}: {e}") if fail_on_error: raise e @@ -350,6 +345,7 @@ async def main(): tasks = [] endpoint_topic_mappings = await get_kafka_topics_from_device_registry_endpoints(True) + print(endpoint_topic_mappings) endpoint_topic_mappings.pop("/api/v1/data") endpoints = endpoint_topic_mappings.keys() for endpoint in endpoints: diff --git a/pyproject.toml b/pyproject.toml index 50ec6fb..2397285 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -20,7 +20,7 @@ dependencies = [ "httpx", "aiokafka", "isodate", - "fvhiot@https://github.com/ForumViriumHelsinki/FVHIoT-python/archive/refs/tags/v0.3.1.zip", + "fvhiot@https://github.com/ForumViriumHelsinki/FVHIoT-python/archive/refs/tags/v0.3.3.zip", "kafka-python", "msgpack", "pydantic", From 3a14e18ce1cee35b4b6dfa20e86b5ea74e160055 Mon Sep 17 00:00:00 2001 From: Sheena <11192150+sheenacodes@users.noreply.github.com> Date: Thu, 19 Oct 2023 00:34:56 +0300 Subject: [PATCH 7/7] change to uniform env names --- app.py | 28 ++++++++++----------- docker-compose.yml | 63 ---------------------------------------------- 2 files changed, 14 insertions(+), 77 deletions(-) delete mode 100644 docker-compose.yml diff --git a/app.py b/app.py index 8deeffc..5208f52 100644 --- a/app.py +++ b/app.py @@ -19,12 +19,12 @@ ) # TODO: for testing, add better defaults (or remove completely to make sure it is set in env) -DEVICE_REGISTRY_ENDPOINTS_URL = os.getenv("DEVICE_REGISTRY_ENDPOINTS_URL", "http://127.0.0.1:8000/api/v1/hosts/localhost/") +ENDPOINT_CONFIG_URL = os.getenv("ENDPOINT_CONFIG_URL", "http://127.0.0.1:8000/api/v1/hosts/localhost/") DEVICE_REGISTRY_URL = os.getenv("DEVICE_REGISTRY_URL", "http://127.0.0.1:8000/api/v1/") -DEVICE_REGISTRY_API_TOKEN = os.getenv("DEVICE_REGISTRY_API_TOKEN", "abcdef1234567890abcdef1234567890abcdef12") +DEVICE_REGISTRY_TOKEN = os.getenv("DEVICE_REGISTRY_TOKEN", "abcdef1234567890abcdef1234567890abcdef12") device_registry_request_headers = { - "Authorization": f"Token {DEVICE_REGISTRY_API_TOKEN}", + "Authorization": f"Token {DEVICE_REGISTRY_TOKEN}", "User-Agent": "mittaridatapumppu-parser/0.1.0", "Accept": "application/json", } @@ -56,7 +56,7 @@ async def get_device_data_devreg(device_id: str) -> dict: :return: Device data in a dict """ metadata = {} - if DEVICE_REGISTRY_URL is None or DEVICE_REGISTRY_API_TOKEN is None: + if DEVICE_REGISTRY_URL is None or DEVICE_REGISTRY_TOKEN is None: logging.error( "DEVICE_REGISTRY_URL and DEVICE_REGISTRY_TOKEN must be defined, " "querying device metadata failed" ) @@ -92,10 +92,10 @@ async def get_kafka_topics_from_device_registry_endpoints(fail_on_error: bool) - # Create request to ENDPOINTS_URL and get data using httpx async with httpx.AsyncClient() as client: try: - response = await client.get(DEVICE_REGISTRY_ENDPOINTS_URL, headers=device_registry_request_headers) + response = await client.get(ENDPOINT_CONFIG_URL, headers=device_registry_request_headers) if response.status_code == 200: data = response.json() - logging.info(f"Got {len(data['endpoints'])} endpoints from device registry {DEVICE_REGISTRY_ENDPOINTS_URL}") + logging.info(f"Got {len(data['endpoints'])} endpoints from device registry {ENDPOINT_CONFIG_URL}") endpoint_topic_mappings = {} for endpoint in data["endpoints"]: try: @@ -119,7 +119,7 @@ async def get_kafka_topics_from_device_registry_endpoints(fail_on_error: bool) - return endpoint_topic_mappings except Exception as e: - logging.error(f"Failed to get endpoints from device registry {DEVICE_REGISTRY_ENDPOINTS_URL}: {e}") + logging.error(f"Failed to get endpoints from device registry {ENDPOINT_CONFIG_URL}: {e}") if fail_on_error: raise e @@ -228,8 +228,8 @@ async def process_kafka_raw_topic(raw_data: bytes): # TODO: store data for future re-processing return unpacked_data, device_data, None - print(device_data) - print(f"printing parser module {parser_module_name}") + #print(device_data) + logging.info(f"printing parser module {parser_module_name}") return unpacked_data, device_data, parser_module_name except Exception as err: @@ -293,12 +293,12 @@ async def parse_data(unpacked_data, device_data, parser_module_name): except ModuleNotFoundError as err: logging.critical(f"Importing parser module {parser_module_name} failed: {err}") - return + return None # TODO: store data for future re-processing except Exception as err: logging.exception(f"parsing failed at {parser_module_name} : {err}") # TODO: send data to spare topic for future reprocessing? - return + return None # TODO group id variable @@ -319,7 +319,8 @@ async def consume_and_parse_data_stream(raw_data_topic, parsed_data_topic, produ logging.info("Preparing to parse payload") [unpacked_data, device_data, parser_module_name] = await process_kafka_raw_topic(msg.value) - print(f"printing unpacked data {unpacked_data}") + logging.debug(f"printing unpacked data {unpacked_data}") + packed_data = None if parser_module_name: packed_data = await parse_data(unpacked_data, device_data, parser_module_name) if packed_data: @@ -345,14 +346,13 @@ async def main(): tasks = [] endpoint_topic_mappings = await get_kafka_topics_from_device_registry_endpoints(True) - print(endpoint_topic_mappings) + logging.debug(endpoint_topic_mappings) endpoint_topic_mappings.pop("/api/v1/data") endpoints = endpoint_topic_mappings.keys() for endpoint in endpoints: logging.info(f"Setting up parser for path: {endpoint}") e2t_map = endpoint_topic_mappings[endpoint] - print(e2t_map) raw_data_topic = e2t_map["raw_topic"] parsed_data_topic = e2t_map["parsed_topic"] tasks.append(consume_and_parse_data_stream(raw_data_topic, parsed_data_topic, producer)) diff --git a/docker-compose.yml b/docker-compose.yml deleted file mode 100644 index 16cbd79..0000000 --- a/docker-compose.yml +++ /dev/null @@ -1,63 +0,0 @@ -# mittaridatapumppu-parser - -version: '3.8' - -services: - parser: - #image: ghcr.io/city-of-helsinki/mittaridatapumppu-parser - build: . - #command: ["python", "./watcher.py"] - # depends_on: - # kafka: - # condition: service_healthy - #restart: unless-stopped - environment: - KAFKA_HOST: "kafka" - KAFKA_PORT: 9092 - KAFKA_BOOTSTRAP_SERVERS: "kafka:9092" - KAFKA_GROUP_ID: "digita_dev" - KAFKA_PARSED_DATA_TOPIC_NAME: "digita.parseddata" - KAFKA_RAW_DATA_TOPIC_NAME: "digita.rawdata" - DEVREG_ENDPOINTS_URL: "http://devreg:8000/api/v1/hosts/localhost/" - DEVICE_REGISTRY_URL: "http://devreg:8000/api/v1" - DEVICE_REGISTRY_TOKEN: abcdef1234567890abcdef1234567890abcdef12 - LOG_LEVEL: "DEBUG" - DEBUG: 1 - DEV_SERVER: 1 - volumes: - - .:/home/app - - ../fvhiot-python/fvhiot:/home/app/fvhiot - networks: - - mittaridatapumppu-dev_dkrnw - -# kafka: -# image: bitnami/kafka:3.4 -# ports: -# - "9092:9092" -# volumes: -# - "kafka_data:/bitnami" -# environment: -# KAFKA_CFG_AUTO_CREATE_TOPICS_ENABLE: "true" -# # Kafka KRaft settings -# KAFKA_CFG_NODE_ID: 0 -# KAFKA_CFG_PROCESS_ROLES: "controller,broker" -# KAFKA_CFG_CONTROLLER_QUORUM_VOTERS: "0@kafka:9093" -# # Listeners -# KAFKA_CFG_LISTENERS: "PLAINTEXT://:9092,CONTROLLER://:9093" -# KAFKA_CFG_ADVERTISED_LISTENERS: "PLAINTEXT://:9092" -# KAFKA_CFG_LISTENER_SECURITY_PROTOCOL_MAP: "CONTROLLER:PLAINTEXT,PLAINTEXT:PLAINTEXT" -# KAFKA_CFG_CONTROLLER_LISTENER_NAMES: "CONTROLLER" -# KAFKA_CFG_INTER_BROKER_LISTENER_NAME: "PLAINTEXT" -# healthcheck: -# interval: 10s -# retries: 3 -# test: kafka-topics.sh --bootstrap-server kafka:9092 --topic hc --create --if-not-exists && kafka-topics.sh --bootstrap-server kafka:9092 --topic hc --describe -# timeout: 5s - -# volumes: -# kafka_data: - - -networks: - mittaridatapumppu-dev_dkrnw: - external: true