Skip to content

Commit

Permalink
removing device specific parsing from app (they go to device specific…
Browse files Browse the repository at this point in the history
… parser modules)
  • Loading branch information
sheenacodes committed Oct 11, 2023
1 parent 0338cb1 commit 0b7425c
Show file tree
Hide file tree
Showing 2 changed files with 101 additions and 133 deletions.
159 changes: 57 additions & 102 deletions app.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
import datetime
import importlib
import json
import logging
import os
import pathlib
Expand All @@ -9,9 +8,8 @@

import httpx
from kafka.producer.future import RecordMetadata
from pydantic.error_wrappers import ValidationError

from fvhiot.models.thingpark import DevEuiUplink

from fvhiot.utils import init_script
from fvhiot.utils.data import data_unpack, data_pack
from fvhiot.utils.kafka import get_kafka_producer_by_envs, get_kafka_consumer_by_envs
Expand All @@ -26,9 +24,7 @@ def backup_messages(raw_data_topic: str, msg):
"""
backupdir = pathlib.Path("messages") / pathlib.Path(raw_data_topic)
backupdir.mkdir(parents=True, exist_ok=True)
backupfile = backupdir / pathlib.Path(
"{}.msgpack".format(datetime.datetime.utcnow().strftime("%Y-%m-%d"))
)
backupfile = backupdir / pathlib.Path("{}.msgpack".format(datetime.datetime.utcnow().strftime("%Y-%m-%d")))
try:
with open(backupfile, "ab") as outfile:
outfile.write(msg.value) # msg.value is msgpack.packb()'ed data
Expand All @@ -51,13 +47,6 @@ def on_send_error(excp):
logging.error("Error on Kafka producer", exc_info=excp)


# TODO: digita related
def get_uplink_obj(data: dict) -> DevEuiUplink:
body_data = json.loads(data["request"]["body"].decode())
uplink_obj = DevEuiUplink(**body_data["DevEUI_uplink"])
return uplink_obj


def get_device_data_devreg(device_id: str) -> dict:
"""
Get device metadata from device registry
Expand All @@ -70,8 +59,7 @@ def get_device_data_devreg(device_id: str) -> dict:
devreg_token = os.getenv("DEVICE_REGISTRY_TOKEN")
if devreg_url is None or devreg_token is None:
logging.error(
"DEVICE_REGISTRY_URL and DEVICE_REGISTRY_TOKEN must be defined, "
"querying device metadata failed"
"DEVICE_REGISTRY_URL and DEVICE_REGISTRY_TOKEN must be defined, " "querying device metadata failed"
)
return metadata
if device_id is None:
Expand All @@ -94,9 +82,7 @@ def get_device_data_devreg(device_id: str) -> dict:
metadata = response.json()
logging.debug(metadata)
else:
logging.warning(
f"Device registry returned {response.status_code} {response.text}"
)
logging.warning(f"Device registry returned {response.status_code} {response.text}")
except httpx.HTTPError as err:
logging.exception(f"{err}")

Expand All @@ -114,25 +100,12 @@ def get_device_data(device_id: str) -> dict:
if os.getenv("DEVICE_REGISTRY_URL"):
return get_device_data_devreg(device_id)
else:
logging.error(
"DEVICE_REGISTRY_URL must be defined, querying device metadata failed"
)
logging.error("DEVICE_REGISTRY_URL must be defined, querying device metadata failed")
return metadata


# TODO: digita related
def parse_payload(
parser_module, payload_hex: str, port: int, timestamp: datetime.datetime
):
# TODO: implement new function in parser_module, which accepts timestamp
val = parser_module.decode_hex(payload_hex, port=port)
return val


# TODO: generic
def create_parsed_data_message(
timestamp: datetime.datetime, payload: list, device: dict
) -> dict:
def create_parsed_data_message(timestamp: datetime.datetime, payload: list, device: dict) -> dict:
"""
Mega function to create parsed data messages.
Data structure loosely follows JTS (Json time series) format.
Expand All @@ -145,9 +118,7 @@ def create_parsed_data_message(
time_str = timestamp.isoformat()
parsed_data["meta"] = {
"timestamp_received": "{}".format(time_str),
"timestamp_parsed": "{}".format(
datetime.datetime.now(tz=ZoneInfo("UTC")).isoformat()
),
"timestamp_parsed": "{}".format(datetime.datetime.now(tz=ZoneInfo("UTC")).isoformat()),
}
parsed_data["device"] = device
# header varibles
Expand All @@ -162,9 +133,7 @@ def create_parsed_data_message(
for d in item["data"].keys():
keys.add(d)
keys = sorted(list(keys)) # now we have all unique keys in sorted list
col_map = (
{}
) # create mapping for silly "0", "1", "2" named columns and real data keys
col_map = {} # create mapping for silly "0", "1", "2" named columns and real data keys
for k in keys:
col_name = str(col_cnt) # "0", "1", "2" and so on
columns[col_name] = {"name": k} # e.g. columns["0] = {"name" : "temp"}
Expand All @@ -175,9 +144,7 @@ def create_parsed_data_message(
"time": item["time"],
"f": {},
} # take "time" as is, we trust that it is a valid ISO date str
for k, v in sorted(
item["data"].items()
): # put keys into "f" in sorted order (same as in header)
for k, v in sorted(item["data"].items()): # put keys into "f" in sorted order (same as in header)
col_name = col_map[k]
data_item["f"][col_name] = {"v": v}
parsed_data["data"].append(data_item)
Expand All @@ -195,15 +162,40 @@ def create_parsed_data_message(
return parsed_data


def process_kafka_raw_topic(raw_data: bytes):
# TODO doc
unpacked_data = data_unpack(raw_data)
logging.info(pformat(unpacked_data))
device_id = unpacked_data["device_id"]
if device_id is None:
logging.warning("Device id not found in raw data - unpacked_data['device_id'] ")
# TODO: store data for future re-processing
return unpacked_data, None, None
device_data = get_device_data(device_id)
# if device_data is None or "device_metadata" not in device_data:
if device_data is None:
logging.warning(f"Device data not found for device_id: {device_id}")
# TODO: store data for future re-processing
return unpacked_data, None, None

parser_module_name = device_data.get("parser_module", "")
if parser_module_name == "":
logging.warning("Parser module name not found")
# TODO: store data for future re-processing
return unpacked_data, device_data, None

print(device_data)
print(f"printing parser module {parser_module_name}")
return unpacked_data, device_data, parser_module_name


def main():
init_script()
raw_data_topic = os.getenv("KAFKA_RAW_DATA_TOPIC_NAME")
raw_data_topic = "cesva.rawdata" # os.getenv("KAFKA_RAW_DATA_TOPIC_NAME")
parsed_data_topic = os.getenv("KAFKA_PARSED_DATA_TOPIC_NAME")
logging.info(
f"Get Kafka consumer for {raw_data_topic} and producer for {parsed_data_topic}"
)
logging.info(f"Get Kafka consumer for {raw_data_topic} and producer for {parsed_data_topic}")
# Create Kafka consumer for incoming raw data messages
consumer = get_kafka_consumer_by_envs(raw_data_topic)
consumer = get_kafka_consumer_by_envs(raw_data_topic) # listen to multiple topics -> ?
producer = get_kafka_producer_by_envs()
if consumer is None or producer is None:
logging.critical("Kafka connection failed, exiting.")
Expand All @@ -213,62 +205,25 @@ def main():
logging.info("Parser is waiting for raw data messages from Kafka.")
for msg in consumer:
logging.info("Preparing to parse payload")
data = data_unpack(msg.value)
logging.info(pformat(data))
# if os.getenv("DEBUG"):
# backup_messages(raw_data_topic, msg)
data["raw_data_topic"] = raw_data_topic # TODO: add this value in endpoint
try:
uplink_obj = get_uplink_obj(data) # TODO: handle errors here
except KeyError as err:
logging.warning(f"KeyError '{err}', message has no DevEUI_uplink key?")
continue
except ValidationError as err:
logging.warning(f"ValidationError '{err}'")
continue
device_data = get_device_data(uplink_obj.DevEUI)
# if device_data is None or "device_metadata" not in device_data:
if device_data is None:
logging.warning(f"Device data for {uplink_obj.DevEUI} not found.")
# TODO: store data for future re-processing
continue
# print(type(device_data), device_data)
# TODO: use better datetime parser?
packet_timestamp = datetime.datetime.strptime(
uplink_obj.Time, "%Y-%m-%dT%H:%M:%S.%f%z"
)
parser_module_name = device_data.get("parser_module", "")
if parser_module_name == "":
logging.warning("Parser module name not found")
# TODO: store data for future re-processing
continue

logging.info(
f"Trying to parse hex payload {uplink_obj.payload_hex} with "
f"{parser_module_name}"
)
try:
parser_module = importlib.import_module(parser_module_name)
except ModuleNotFoundError as err:
logging.warning(f"Importing parser module failed: {err}")
# TODO: store data for future re-processing
continue
try:
print(uplink_obj.payload_hex, uplink_obj.FPort, uplink_obj.Time)
datalines = parser_module.create_datalines(
uplink_obj.payload_hex, port=uplink_obj.FPort, time_str=uplink_obj.Time
)
parsed_data = create_parsed_data_message(
packet_timestamp, datalines, device_data
)
logging.debug(pformat(parsed_data))
packed_data = data_pack(parsed_data)
producer.send(parsed_data_topic, packed_data).add_callback(
on_send_success
).add_errback(on_send_error)
except Exception as err:
logging.exception(f"Failed to get parser module: {err}")
# TODO: send data to spare topic for future reprocessing?
[unpacked_data, device_data, parser_module_name] = process_kafka_raw_topic(msg.value)
print(f"printing unpacked data {unpacked_data}")
if parser_module_name:
try:
parser_module = importlib.import_module(parser_module_name)
except ModuleNotFoundError as err:
logging.warning(f"Importing parser module failed: {err}")
# TODO: store data for future re-processing
continue
try:
packet_timestamp, datalines = parser_module.create_datalines_from_raw_unpacked_data(unpacked_data)
parsed_data = create_parsed_data_message(packet_timestamp, datalines, device_data)
logging.debug(pformat(parsed_data))
packed_data = data_pack(parsed_data)
producer.send(parsed_data_topic, packed_data).add_callback(on_send_success).add_errback(on_send_error)
except Exception as err:
logging.exception(f"Failed to get parser module: {err}")
# TODO: send data to spare topic for future reprocessing?


if __name__ == "__main__":
Expand Down
75 changes: 44 additions & 31 deletions docker-compose.yml
Original file line number Diff line number Diff line change
Expand Up @@ -3,48 +3,61 @@
version: '3.8'

services:
parser-digita:
image: ghcr.io/city-of-helsinki/mittaridatapumppu-parser
parser:
#image: ghcr.io/city-of-helsinki/mittaridatapumppu-parser
build: .
depends_on:
kafka:
condition: service_healthy
restart: unless-stopped
#command: ["python", "./watcher.py"]
# depends_on:
# kafka:
# condition: service_healthy
#restart: unless-stopped
environment:
KAFKA_HOST: "kafka"
KAFKA_PORT: 9092
KAFKA_BOOTSTRAP_SERVERS: "kafka:9092"
KAFKA_GROUP_ID: "digita_dev"
KAFKA_PARSED_DATA_TOPIC_NAME: "digita.parseddata"
KAFKA_RAW_DATA_TOPIC_NAME: "digita.rawdata"
DEVREG_ENDPOINTS_URL: "http://devreg:8000/api/v1/hosts/localhost/"
DEVICE_REGISTRY_URL: "http://devreg:8000/api/v1"
DEVICE_REGISTRY_TOKEN: b48455759b691baf3b811ba437ce9e581fc0a37e
DEVICE_REGISTRY_TOKEN: abcdef1234567890abcdef1234567890abcdef12
LOG_LEVEL: "DEBUG"
DEBUG: 1
DEV_SERVER: 1
kafka:
image: bitnami/kafka:3.4
ports:
- "9092:9092"
volumes:
- "kafka_data:/bitnami"
environment:
KAFKA_CFG_AUTO_CREATE_TOPICS_ENABLE: "true"
# Kafka KRaft settings
KAFKA_CFG_NODE_ID: 0
KAFKA_CFG_PROCESS_ROLES: "controller,broker"
KAFKA_CFG_CONTROLLER_QUORUM_VOTERS: "0@kafka:9093"
# Listeners
KAFKA_CFG_LISTENERS: "PLAINTEXT://:9092,CONTROLLER://:9093"
KAFKA_CFG_ADVERTISED_LISTENERS: "PLAINTEXT://:9092"
KAFKA_CFG_LISTENER_SECURITY_PROTOCOL_MAP: "CONTROLLER:PLAINTEXT,PLAINTEXT:PLAINTEXT"
KAFKA_CFG_CONTROLLER_LISTENER_NAMES: "CONTROLLER"
KAFKA_CFG_INTER_BROKER_LISTENER_NAME: "PLAINTEXT"
healthcheck:
interval: 10s
retries: 3
test: kafka-topics.sh --bootstrap-server kafka:9092 --topic hc --create --if-not-exists && kafka-topics.sh --bootstrap-server kafka:9092 --topic hc --describe
timeout: 5s
- .:/home/app
- ../fvhiot-python/fvhiot:/home/app/fvhiot
networks:
- mittaridatapumppu-dev_dkrnw

# kafka:
# image: bitnami/kafka:3.4
# ports:
# - "9092:9092"
# volumes:
# - "kafka_data:/bitnami"
# environment:
# KAFKA_CFG_AUTO_CREATE_TOPICS_ENABLE: "true"
# # Kafka KRaft settings
# KAFKA_CFG_NODE_ID: 0
# KAFKA_CFG_PROCESS_ROLES: "controller,broker"
# KAFKA_CFG_CONTROLLER_QUORUM_VOTERS: "0@kafka:9093"
# # Listeners
# KAFKA_CFG_LISTENERS: "PLAINTEXT://:9092,CONTROLLER://:9093"
# KAFKA_CFG_ADVERTISED_LISTENERS: "PLAINTEXT://:9092"
# KAFKA_CFG_LISTENER_SECURITY_PROTOCOL_MAP: "CONTROLLER:PLAINTEXT,PLAINTEXT:PLAINTEXT"
# KAFKA_CFG_CONTROLLER_LISTENER_NAMES: "CONTROLLER"
# KAFKA_CFG_INTER_BROKER_LISTENER_NAME: "PLAINTEXT"
# healthcheck:
# interval: 10s
# retries: 3
# test: kafka-topics.sh --bootstrap-server kafka:9092 --topic hc --create --if-not-exists && kafka-topics.sh --bootstrap-server kafka:9092 --topic hc --describe
# timeout: 5s

# volumes:
# kafka_data:


volumes:
kafka_data:
networks:
mittaridatapumppu-dev_dkrnw:
external: true

0 comments on commit 0b7425c

Please sign in to comment.