From bf6812083439bcd79b76265a21bb0eda8a29cf2c Mon Sep 17 00:00:00 2001 From: Craig <3979063+craig8@users.noreply.github.com> Date: Fri, 8 Nov 2024 13:15:37 -0800 Subject: [PATCH] Code cleanup --- src/dnp3_python/dnp3station/station_utils.py | 179 ------------------- 1 file changed, 179 deletions(-) diff --git a/src/dnp3_python/dnp3station/station_utils.py b/src/dnp3_python/dnp3station/station_utils.py index df444da..629bace 100644 --- a/src/dnp3_python/dnp3station/station_utils.py +++ b/src/dnp3_python/dnp3station/station_utils.py @@ -534,13 +534,6 @@ def process(self, command: OutstationCmdType, index: int): "Binary": "BinaryInput", "BinaryOutputStatus": "BinaryOutput", } - # self.dnp3_database.add_point( - # command_to_dataclass_type[command.__class__.__name__], - # index, - # command.value, - # str(datetime.datetime.now()), - # # datetime.datetime.now(), - # ) class MyLogger(openpal.ILogHandler): @@ -581,47 +574,6 @@ def to_flat_db(db: dict) -> dict: return db_flat -# def to_pnnl_schema(db: dict, is_wrapped_text=True) -> pd.DataFrame: -# db_flat = to_flat_db(db) -# df_flat = pd.DataFrame(db_flat) - -# def dnp3_name_mapping(default_name: str) -> str: -# point_type, point_index = default_name.split("_")[0], default_name.split("_")[1] -# if point_type == "Analog": -# schema_type = "AI" -# elif point_type == "AnalogOutputStatus": -# schema_type = "AO" -# elif point_type == "Binary": -# schema_type = "BI" -# else: # point_type == "BinaryOutputStatus": -# schema_type = "BO" - -# return f"{schema_type}-{(point_index).zfill(3)}" # 'padding in-front with zeros up to 3 digits - -# df_flat["point_name"] = df_flat["Name"].apply(lambda x: dnp3_name_mapping(x)) -# df_points = pd.read_csv("/home/kefei/project/dnp3-python/dnp3-example-schema.csv") - -# import textwrap - -# def wrap_text(text, width): -# return "\n".join(textwrap.wrap(text, width=width)) - -# df_merged = pd.merge( -# df_flat[["Value", "point_name"]], -# df_points, -# left_on="point_name", -# right_on="Point", -# ).drop("point_name", axis=1) - -# if is_wrapped_text: -# df_merged["Desc_Short"] = df_merged["Desc_Short"].apply( -# lambda x: wrap_text(x, width=30) -# ) -# # df_merged["Value"] = df_merged["Value"].apply(lambda x: int(x)) - -# return df_merged - - import datetime from dataclasses import dataclass, field from typing import List @@ -656,137 +608,6 @@ class Dnp3DatabaseRecord: # received_at: datetime.datetime -# @dataclass -# class Dnp3Database: -# """ -# Represents a database for storing DNP3 data points. - -# Attributes: -# rows (List[Dnp3DatabaseRecord]): A list of Dnp3DatabaseRecord instances. - -# Example: -# >>> db = Dnp3Database() -# >>> db.add_point("AnalogInput", 1, 123.45, datetime.datetime.now()) -# >>> print(db.query_by_type("AnalogInput")) -# >>> sorted_db = db.sort_by_field("index") -# >>> print(sorted_db.rows) -# """ - -# rows: List[Dnp3DatabaseRecord] = field(default_factory=list) - -# def add_point( -# self, -# point_type: str, -# index: int, -# value: str | float | bool | None, -# updated_at: str | datetime.datetime, -# # received_at: datetime.datetime, -# ): -# """ -# Adds a new point to the database. - -# Args: -# point_type (str): The type of point, must be one of ["AnalogInput", "AnalogOutput", "BinaryInput", "BinaryOutput"]. -# index (int): The index of the point. -# value (str | float | bool | None): The value of the point. -# updated_at (datetime.datetime): The timestamp when the point is being updated. -# received_at (datetime.datetime): The timestamp when the point is being received. - -# Raises: -# ValueError: If the point_type is not valid. - -# Example: -# >>> db.add_point("BinaryInput", 2, True, datetime.datetime.now(), datetime.datetime.now()) -# """ -# if point_type in ["AnalogInput", "AnalogOutput", "BinaryInput", "BinaryOutput"]: -# new_record = Dnp3DatabaseRecord(point_type, index, value, updated_at) -# self.rows.append(new_record) -# else: -# raise ValueError("Invalid point type") - -# def query_by_type(self, point_type: str) -> "Dnp3Database": -# """ -# Queries the database for all records of a specified type. - -# Args: -# point_type (str): The type of points to query for. - -# Returns: -# Dnp3Database: A new Dnp3Database instance containing only the records of the specified type. - -# Example: -# >>> db.query_by_type("AnalogInput") -# """ -# filtered_rows = [point for point in self.rows if point.point_type == point_type] -# return Dnp3Database(filtered_rows) - -# def sort_by_field( -# self, field_name: str, descending: bool = False -# ) -> "Dnp3Database": -# """ -# Sorts the database by a specified field. - -# Args: -# field_name (str): The name of the field to sort by. -# descending (bool): Whether to sort in descending order. - -# Returns: -# Dnp3Database: A new Dnp3Database instance containing the sorted records. - -# Raises: -# ValueError: If the field_name does not exist in the records. - -# Example: -# >>> db.sort_by_field("updated_at", descending=True) -# """ -# if not self.rows: -# return Dnp3Database() -# if hasattr(self.rows[0], field_name): -# sorted_rows = sorted( -# self.rows, key=lambda x: getattr(x, field_name), reverse=descending -# ) -# return Dnp3Database(sorted_rows) -# else: -# raise ValueError(f"Field {field_name} does not exist in Point data class") - -# def to_csv(self, file_path: str | None = None) -> str: -# """ -# Converts the database records to a CSV formatted string. - -# Returns: -# str: A string containing the CSV representation of the database records. - -# Example: -# >>> db = Dnp3Database() -# >>> db.add_point("AnalogInput", 1, 123.45, datetime.datetime.now(), datetime.datetime.now()) -# >>> print(db.to_csv()) -# """ -# output = io.StringIO() -# writer = csv.DictWriter( -# output, -# fieldnames=["point_type", "index", "value", "updated_at"], -# ) -# writer.writeheader() -# for record in self.rows: -# writer.writerow(asdict(record)) - -# # Ensure the file exists, if not, create it -# output_stream = output.getvalue() -# if file_path: -# try: -# with open(file_path, "w") as f: -# f.write(output_stream) -# _log.info(f"Wrote dnp3-database to {file_path=}") -# except FileNotFoundError: -# # If the directory doesn't exist, you may want to create it -# directory = os.path.dirname(file_path) -# os.makedirs(directory, exist_ok=True) -# # After ensuring the directory exists, try writing again -# with open(file_path, "w") as f: -# f.write(output_stream) -# return output_stream - - class Dnp3Database: """ DNP3 database representation