Skip to content

Commit

Permalink
Code cleanup
Browse files Browse the repository at this point in the history
  • Loading branch information
craig8 authored Nov 8, 2024
1 parent 28f4f83 commit bf68120
Showing 1 changed file with 0 additions and 179 deletions.
179 changes: 0 additions & 179 deletions src/dnp3_python/dnp3station/station_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -534,13 +534,6 @@ def process(self, command: OutstationCmdType, index: int):
"Binary": "BinaryInput",
"BinaryOutputStatus": "BinaryOutput",
}
# self.dnp3_database.add_point(
# command_to_dataclass_type[command.__class__.__name__],
# index,
# command.value,
# str(datetime.datetime.now()),
# # datetime.datetime.now(),
# )


class MyLogger(openpal.ILogHandler):
Expand Down Expand Up @@ -581,47 +574,6 @@ def to_flat_db(db: dict) -> dict:
return db_flat


# def to_pnnl_schema(db: dict, is_wrapped_text=True) -> pd.DataFrame:
# db_flat = to_flat_db(db)
# df_flat = pd.DataFrame(db_flat)

# def dnp3_name_mapping(default_name: str) -> str:
# point_type, point_index = default_name.split("_")[0], default_name.split("_")[1]
# if point_type == "Analog":
# schema_type = "AI"
# elif point_type == "AnalogOutputStatus":
# schema_type = "AO"
# elif point_type == "Binary":
# schema_type = "BI"
# else: # point_type == "BinaryOutputStatus":
# schema_type = "BO"

# return f"{schema_type}-{(point_index).zfill(3)}" # 'padding in-front with zeros up to 3 digits

# df_flat["point_name"] = df_flat["Name"].apply(lambda x: dnp3_name_mapping(x))
# df_points = pd.read_csv("/home/kefei/project/dnp3-python/dnp3-example-schema.csv")

# import textwrap

# def wrap_text(text, width):
# return "\n".join(textwrap.wrap(text, width=width))

# df_merged = pd.merge(
# df_flat[["Value", "point_name"]],
# df_points,
# left_on="point_name",
# right_on="Point",
# ).drop("point_name", axis=1)

# if is_wrapped_text:
# df_merged["Desc_Short"] = df_merged["Desc_Short"].apply(
# lambda x: wrap_text(x, width=30)
# )
# # df_merged["Value"] = df_merged["Value"].apply(lambda x: int(x))

# return df_merged


import datetime
from dataclasses import dataclass, field
from typing import List
Expand Down Expand Up @@ -656,137 +608,6 @@ class Dnp3DatabaseRecord:
# received_at: datetime.datetime


# @dataclass
# class Dnp3Database:
# """
# Represents a database for storing DNP3 data points.

# Attributes:
# rows (List[Dnp3DatabaseRecord]): A list of Dnp3DatabaseRecord instances.

# Example:
# >>> db = Dnp3Database()
# >>> db.add_point("AnalogInput", 1, 123.45, datetime.datetime.now())
# >>> print(db.query_by_type("AnalogInput"))
# >>> sorted_db = db.sort_by_field("index")
# >>> print(sorted_db.rows)
# """

# rows: List[Dnp3DatabaseRecord] = field(default_factory=list)

# def add_point(
# self,
# point_type: str,
# index: int,
# value: str | float | bool | None,
# updated_at: str | datetime.datetime,
# # received_at: datetime.datetime,
# ):
# """
# Adds a new point to the database.

# Args:
# point_type (str): The type of point, must be one of ["AnalogInput", "AnalogOutput", "BinaryInput", "BinaryOutput"].
# index (int): The index of the point.
# value (str | float | bool | None): The value of the point.
# updated_at (datetime.datetime): The timestamp when the point is being updated.
# received_at (datetime.datetime): The timestamp when the point is being received.

# Raises:
# ValueError: If the point_type is not valid.

# Example:
# >>> db.add_point("BinaryInput", 2, True, datetime.datetime.now(), datetime.datetime.now())
# """
# if point_type in ["AnalogInput", "AnalogOutput", "BinaryInput", "BinaryOutput"]:
# new_record = Dnp3DatabaseRecord(point_type, index, value, updated_at)
# self.rows.append(new_record)
# else:
# raise ValueError("Invalid point type")

# def query_by_type(self, point_type: str) -> "Dnp3Database":
# """
# Queries the database for all records of a specified type.

# Args:
# point_type (str): The type of points to query for.

# Returns:
# Dnp3Database: A new Dnp3Database instance containing only the records of the specified type.

# Example:
# >>> db.query_by_type("AnalogInput")
# """
# filtered_rows = [point for point in self.rows if point.point_type == point_type]
# return Dnp3Database(filtered_rows)

# def sort_by_field(
# self, field_name: str, descending: bool = False
# ) -> "Dnp3Database":
# """
# Sorts the database by a specified field.

# Args:
# field_name (str): The name of the field to sort by.
# descending (bool): Whether to sort in descending order.

# Returns:
# Dnp3Database: A new Dnp3Database instance containing the sorted records.

# Raises:
# ValueError: If the field_name does not exist in the records.

# Example:
# >>> db.sort_by_field("updated_at", descending=True)
# """
# if not self.rows:
# return Dnp3Database()
# if hasattr(self.rows[0], field_name):
# sorted_rows = sorted(
# self.rows, key=lambda x: getattr(x, field_name), reverse=descending
# )
# return Dnp3Database(sorted_rows)
# else:
# raise ValueError(f"Field {field_name} does not exist in Point data class")

# def to_csv(self, file_path: str | None = None) -> str:
# """
# Converts the database records to a CSV formatted string.

# Returns:
# str: A string containing the CSV representation of the database records.

# Example:
# >>> db = Dnp3Database()
# >>> db.add_point("AnalogInput", 1, 123.45, datetime.datetime.now(), datetime.datetime.now())
# >>> print(db.to_csv())
# """
# output = io.StringIO()
# writer = csv.DictWriter(
# output,
# fieldnames=["point_type", "index", "value", "updated_at"],
# )
# writer.writeheader()
# for record in self.rows:
# writer.writerow(asdict(record))

# # Ensure the file exists, if not, create it
# output_stream = output.getvalue()
# if file_path:
# try:
# with open(file_path, "w") as f:
# f.write(output_stream)
# _log.info(f"Wrote dnp3-database to {file_path=}")
# except FileNotFoundError:
# # If the directory doesn't exist, you may want to create it
# directory = os.path.dirname(file_path)
# os.makedirs(directory, exist_ok=True)
# # After ensuring the directory exists, try writing again
# with open(file_path, "w") as f:
# f.write(output_stream)
# return output_stream


class Dnp3Database:
"""
DNP3 database representation
Expand Down

0 comments on commit bf68120

Please sign in to comment.