diff --git a/classes/protocol_settings.py b/classes/protocol_settings.py index ad9004c..2644a92 100644 --- a/classes/protocol_settings.py +++ b/classes/protocol_settings.py @@ -10,7 +10,6 @@ import json import re import os -import math import ast from typing import TYPE_CHECKING @@ -103,12 +102,12 @@ def fromString(cls, name : str): "UINT32" : "UINT", "INT32" : "INT" } - + if name in alias: name = alias[name] return getattr(cls, name) - + @classmethod def getSize(cls, data_type : 'Data_Type'): sizes = { @@ -121,16 +120,16 @@ def getSize(cls, data_type : 'Data_Type'): Data_Type._16BIT_FLAGS : 16, Data_Type._32BIT_FLAGS : 32 } - + if data_type in sizes: return sizes[data_type] if data_type.value > 400: #signed magnitude bits return data_type.value-400 - + if data_type.value > 300: #signed bits return data_type.value-300 - + if data_type.value > 200: #unsigned bits return data_type.value-200 @@ -168,7 +167,7 @@ def fromString(cls, name : str): "YES" : "WRITE", "WO" : "WRITEONLY" } - + if name in alias: name = alias[name] else: @@ -182,20 +181,20 @@ class Registry_Type(Enum): HOLDING = 0x03 INPUT = 0x04 - + @dataclass class registry_map_entry: registry_type : Registry_Type register : int register_bit : int - register_byte : int + register_byte : int ''' byte offset for canbus ect... ''' variable_name : str documented_name : str unit : str unit_mod : float concatenate : bool - concatenate_registers : list[int] + concatenate_registers : list[int] values : list value_regex : str = "" @@ -219,16 +218,15 @@ class registry_map_entry: next_read_timestamp : int = 0 ''' unix timestamp in ms ''' - write_mode : WriteMode = WriteMode.READ ''' enable disable reading/writing ''' - + def __str__(self): return self.variable_name def __eq__(self, other): - return ( isinstance(other, registry_map_entry) - and self.register == other.register + return ( isinstance(other, registry_map_entry) + and self.register == other.register and self.register_bit == other.register_bit and self.registry_type == other.registry_type and self.register_byte == other.register_byte) @@ -309,7 +307,7 @@ def __init__(self, protocol : str, transport_settings : 'SectionProxy' = None, s def get_registry_map(self, registry_type : Registry_Type = Registry_Type.ZERO) -> list[registry_map_entry]: return self.registry_map[registry_type] - + def get_registry_ranges(self, registry_type : Registry_Type) -> list[registry_map_entry]: return self.registry_map_ranges[registry_type] @@ -317,18 +315,18 @@ def get_registry_ranges(self, registry_type : Registry_Type) -> list[registry_ma def get_holding_registry_entry(self, name : str): ''' deprecated ''' return self.get_registry_entry(name, registry_type=Registry_Type.HOLDING) - + def get_input_registry_entry(self, name : str): ''' deprecated ''' return self.get_registry_entry(name, registry_type=Registry_Type.INPUT) def get_registry_entry(self, name : str, registry_type : Registry_Type) -> registry_map_entry: - + name = name.strip().lower().replace(' ', '_') #clean name for item in self.registry_map[registry_type]: if item.documented_name == name: return item - + return None def load__json(self, file : str = '', settings_dir : str = ''): @@ -358,7 +356,7 @@ def load__json(self, file : str = '', settings_dir : str = ''): def load_registry_overrides(self, override_path, keys : list[str]): """Load overrides into a multidimensional dictionary keyed by each specified key.""" overrides = {key: {} for key in keys} - + with open(override_path, newline='', encoding='latin-1') as csvfile: reader = csv.DictReader(csvfile) for row in reader: @@ -371,7 +369,7 @@ def load_registry_overrides(self, override_path, keys : list[str]): return overrides - def load__registry(self, path, registry_type : Registry_Type = Registry_Type.INPUT) -> list[registry_map_entry]: + def load__registry(self, path, registry_type : Registry_Type = Registry_Type.INPUT) -> list[registry_map_entry]: registry_map : list[registry_map_entry] = [] register_regex = re.compile(r'(?P(?:0?x[\da-z]+|[\d]+))\.(b(?Px?\d{1,2})|(?Px?\d{1,2}))') @@ -393,7 +391,7 @@ def load__registry(self, path, registry_type : Registry_Type = Registry_Type.INP if not os.path.exists(path): #return empty is file doesnt exist. return registry_map - + overrides : dict[str, dict] = None override_keys = ['documented name', 'register'] @@ -407,13 +405,13 @@ def load__registry(self, path, registry_type : Registry_Type = Registry_Type.INP self._log.info("loading override file: " + override_path) overrides = self.load_registry_overrides(override_path, override_keys) - + def determine_delimiter(first_row) -> str: if first_row.count(';') > first_row.count(','): return ';' else: - return ',' - + return ',' + def process_row(row): # Initialize variables to hold numeric and character parts unit_multiplier : float = 1 @@ -425,8 +423,8 @@ def process_row(row): row['documented name'] = row['documented name'].strip().lower().replace(' ', '_') #region read_interval - - + + if 'read interval' in row: row['read interval'] = row['read interval'].lower() #ensure is all lower case match = read_interval_regex.search(row['read interval']) @@ -452,7 +450,7 @@ def process_row(row): #region overrides - if overrides != None: + if overrides is not None: #apply overrides using documented name or register override_row = None # Check each key in order until a match is found @@ -462,7 +460,7 @@ def process_row(row): override_row = overrides[key][key_value] overrided_keys.add(key_value) break - + # Apply non-empty override values if an override row is found if override_row: for field, override_value in override_row.items(): @@ -471,7 +469,7 @@ def process_row(row): #endregion overrides - #region unit + #region unit #if or is in the unit; ignore unit if "or" in row['unit'].lower() or ":" in row['unit'].lower(): @@ -491,7 +489,7 @@ def process_row(row): #convert to float try: unit_multiplier = float(unit_multiplier) - except: + except Exception: unit_multiplier = float(1) if unit_multiplier == 0: @@ -502,12 +500,12 @@ def process_row(row): variable_name = row['variable name'] if row['variable name'] else row['documented name'] variable_name = variable_name.strip().lower().replace(' ', '_').replace('__', '_') #clean name - + if re.search(r"[^a-zA-Z0-9\_]", variable_name) : self._log.warning("Invalid Name : " + str(variable_name) + " reg: " + str(row['register']) + " doc name: " + str(row['documented name']) + " path: " + str(path)) - if not variable_name and not row['documented name']: #skip empty entry / no name. todo add more invalidator checks. + if not variable_name and not row['documented name']: #skip empty entry / no name. todo add more invalidator checks. return #region data type @@ -523,7 +521,7 @@ def process_row(row): else: data_type = Data_Type.fromString(row['data type']) - + if 'values' not in row: row['values'] = "" self._log.warning("No Value Column : path: " + str(path)) @@ -575,7 +573,7 @@ def process_row(row): #value_regex val_match = ascii_value_regex.search(row['values']) if val_match: - value_regex = val_match.group('regex') + value_regex = val_match.group('regex') matched = True if not matched: #single value @@ -624,7 +622,7 @@ def process_row(row): else: for i in range(start, end+1): concatenate_registers.append(i) - + if concatenate_registers: r = range(len(concatenate_registers)) else: @@ -645,7 +643,7 @@ def process_row(row): if "write" in row: writeMode = WriteMode.fromString(row['write']) - + for i in r: item = registry_map_entry( registry_type = registry_type, @@ -672,19 +670,19 @@ def process_row(row): register = register + 1 - + with open(path, newline='', encoding='latin-1') as csvfile: #clean column names before passing to csv dict reader - delimeter = ';' + delimeter = ';' first_row = next(csvfile).lower().replace('_', ' ') if first_row.count(';') < first_row.count(','): delimeter = ',' first_row = re.sub(r"\s+" + re.escape(delimeter) +"|" + re.escape(delimeter) +r"\s+", delimeter, first_row) #trim values - csvfile = itertools.chain([first_row], csvfile) #add clean header to begining of iterator + csvfile = itertools.chain([first_row], csvfile) #add clean header to begining of iterator # Create a CSV reader object reader = csv.DictReader(csvfile, delimiter=delimeter) @@ -693,33 +691,33 @@ def process_row(row): for row in reader: process_row(row) - if overrides != None: + if overrides is not None: # Add any unmatched overrides as new entries... probably need to add some better error handling to ensure entry isnt empty ect... for key in override_keys: applied = False for key_value, override_row in overrides[key].items(): # Check if both keys are unique before applying - if all(override_row.get(k) for k in override_keys): + if all(override_row.get(k) for k in override_keys): if all(override_row.get(k) not in overrided_keys for k in override_keys): self._log.info("Loading unique entry from overrides for both unique keys") process_row(override_row) - + # Mark both keys as applied for k in override_keys: overrided_keys.add(override_row.get(k)) - + applied = True break # Exit inner loop after applying unique entry - if applied: + if applied: continue - + for index in reversed(range(len(registry_map))): item = registry_map[index] if index > 0: #if high/low, its a double if ( - item.documented_name.endswith('_l') + item.documented_name.endswith('_l') and registry_map[index-1].documented_name.replace('_h', '_l') == item.documented_name ): combined_item = registry_map[index-1] @@ -733,7 +731,7 @@ def process_row(row): if combined_item.documented_name == combined_item.variable_name: combined_item.variable_name = combined_item.variable_name[:-2].strip() - + combined_item.documented_name = combined_item.documented_name[:-2].strip() if not combined_item.unit: #fix inconsistsent documentation @@ -747,24 +745,25 @@ def process_row(row): for index in reversed(range(len(registry_map))): item = registry_map[index] if ( - item.documented_name.strip().lower() not in self.variable_mask + item.documented_name.strip().lower() not in self.variable_mask and item.variable_name.strip().lower() not in self.variable_mask ): del registry_map[index] - #apply variable screen + #apply variable screen if self.variable_screen: for index in reversed(range(len(registry_map))): item = registry_map[index] if ( - item.documented_name.strip().lower() in self.variable_mask + item.documented_name.strip().lower() in self.variable_mask and item.variable_name.strip().lower() in self.variable_mask ): - del registry_map[index] + del registry_map[index] return registry_map - + def calculate_registry_ranges(self, map : list[registry_map_entry], max_register : int, init : bool = False) -> list[tuple]: + ''' read optimization; calculate which ranges to read''' max_batch_size = 45 #see manual; says max batch is 45 @@ -782,7 +781,7 @@ def calculate_registry_ranges(self, map : list[registry_map_entry], max_register timestamp_ms = 0 while (start := start+max_batch_size) <= max_register: - + registers : list[int] = [] #use a list, im too lazy to write logic end = start+max_batch_size @@ -793,7 +792,7 @@ def calculate_registry_ranges(self, map : list[registry_map_entry], max_register if register.write_mode == WriteMode.WRITEONLY: ##Write Only; skip continue - #we are assuming calc registry ranges is being called EVERY READ. + #we are assuming calc registry ranges is being called EVERY READ. if register.next_read_timestamp < timestamp_ms: register.next_read_timestamp = timestamp_ms + register.read_interval registers.append(register.register) @@ -807,13 +806,13 @@ def find_protocol_file(self, file : str, base_dir : str = '' ) -> str: path = base_dir + '/' + file if os.path.exists(path): return path - + suffix = file.split('_', 1)[0] path = base_dir + '/' + suffix +'/' + file if os.path.exists(path): return path - + #find file by name, recurisvely. last resort search_pattern = os.path.join(base_dir, '**', file) matches = glob.glob(search_pattern, recursive=True) @@ -839,7 +838,7 @@ def load_registry_map(self, registry_type : Registry_Type, file : str = '', sett self.registry_map[registry_type] = self.load__registry(path, registry_type) size : int = 0 - + #get max register size for item in self.registry_map[registry_type]: if item.register > size: @@ -855,7 +854,7 @@ def process_register_bytes(self, registry : dict[int,bytes], entry : registry_ma register = registry[entry.register][0] #can bus uses tuple for timestamp else: register = registry[entry.register] - + if entry.register_byte > 0: register = register[entry.register_byte:] @@ -881,13 +880,13 @@ def process_register_bytes(self, registry : dict[int,bytes], entry : registry_ma #handle custom sizes, less than 1 register end_bit = flag_size + start_bit - + if entry.documented_name+'_codes' in self.codes: code_key : str = entry.documented_name+'_codes' flags : list[str] = [] flag_indexes : list[str] = [] for i in range(start_bit, end_bit): # Iterate over each bit position (0 to 15) - byte = i // 8 + byte = i // 8 bit = i % 8 val = register[byte] # Check if the i-th bit is set @@ -906,7 +905,7 @@ def process_register_bytes(self, registry : dict[int,bytes], entry : registry_ma bits = multibit_flag.split('&') # Split key into 'bits' if all(bit in flag_indexes_set for bit in bits): # Check if all bits are present in the flag_indexes_set flags.append(self.codes[code_key][multibit_flag]) - + value = ",".join(flags) else: flags : list[str] = [] @@ -968,10 +967,10 @@ def process_register_bytes(self, registry : dict[int,bytes], entry : registry_ma entry.documented_name+'_codes' in self.codes): try: cleanval = str(int(value)) - + if cleanval in self.codes[entry.documented_name+'_codes']: value = self.codes[entry.documented_name+'_codes'][cleanval] - except: + except Exception: #do nothing; try is for intval value = value @@ -983,7 +982,7 @@ def process_register_ushort(self, registry : dict[int, int], entry : registry_ma if entry.data_type == Data_Type.UINT: #read uint if entry.register + 1 not in registry: return - + value = float((registry[entry.register] << 16) + registry[entry.register + 1]) elif entry.data_type == Data_Type.SHORT: #read signed short val = registry[entry.register] @@ -998,7 +997,7 @@ def process_register_ushort(self, registry : dict[int, int], entry : registry_ma elif entry.data_type == Data_Type.INT: #read int if entry.register + 1 not in registry: return - + combined_value_unsigned = (registry[entry.register] << 16) + registry[entry.register + 1] # Convert the combined unsigned value to a signed integer if necessary @@ -1011,7 +1010,7 @@ def process_register_ushort(self, registry : dict[int, int], entry : registry_ma #value = struct.unpack(' dict[str,str]: '''process registry into appropriate datatypes and names -- maybe add func for single entry later?''' - + concatenate_registry : dict = {} info = {} for entry in map: if entry.register not in registry: continue - value = '' + value = '' if isinstance(registry[entry.register], bytes): value = self.process_register_bytes(registry, entry) else: value = self.process_register_ushort(registry, entry) - + #if item.unit: # value = str(value) + item.unit if entry.concatenate: @@ -1144,7 +1143,7 @@ def process_registery(self, registry : Union[dict[int, int], dict[int, bytes]] , return info def validate_registry_entry(self, entry : registry_map_entry, val) -> int: - #if code, validate first. + #if code, validate first. if entry.documented_name+'_codes' in self.codes: if val in self.codes[entry.documented_name+'_codes']: return 1 @@ -1162,7 +1161,9 @@ def validate_registry_entry(self, entry : registry_map_entry, val) -> int: if int(val) >= entry.value_min and int(val) <= entry.value_max: return 1 - return 0 + self._log.error(f"validate_registry_entry fail overall {entry.value_min} / {entry.value_max} / {int(val)}") + + return 0 def evaluate_expressions(self, expression, variables : dict[str,str]): # Define the register string @@ -1215,7 +1216,7 @@ def evaluate_ranges(expression): return results - def evaluate_expression(expression): + def evaluate_expression(expression): # Define a regular expression pattern to match "maths" var_pattern = re.compile(r'\[(?P.*?)\]') @@ -1224,15 +1225,15 @@ def replace_vars(match): try: maths = match.group("maths") maths = re.sub(r'\s', '', maths) #remove spaces, because ast.parse doesnt like them - + # Parse the expression safely tree = ast.parse(maths, mode='eval') # Evaluate the expression end_value = eval(compile(tree, filename='', mode='eval')) - + return str(end_value) - except : + except Exception: return match.group(0) # Replace variables with their values @@ -1252,5 +1253,5 @@ def replace_vars(match): for r in results: print(evaluate_expression(r)) - -#settings = protocol_settings('v0.14') \ No newline at end of file + +#settings = protocol_settings('v0.14') diff --git a/classes/transports/canbus.py b/classes/transports/canbus.py index a212070..f715817 100644 --- a/classes/transports/canbus.py +++ b/classes/transports/canbus.py @@ -9,8 +9,8 @@ from .transport_base import transport_base -from ..protocol_settings import Data_Type, Registry_Type, registry_map_entry, protocol_settings -from defs.common import strtobool, strtoint +from ..protocol_settings import Registry_Type, registry_map_entry, protocol_settings +from defs.common import strtoint from collections import OrderedDict from typing import TYPE_CHECKING @@ -46,7 +46,7 @@ class canbus(transport_base): cacheTimeout : int = 120 ''' seconds to keep message in cache ''' - emptyTime : float = None + emptyTime : float = None ''' the last time values were read for watchdog''' watchDogTime : float = 120 @@ -93,7 +93,7 @@ def __init__(self, settings : 'SectionProxy', protocolSettings : 'protocol_setti thread = threading.Thread(target=self.start_loop) - thread.daemon = True + thread.daemon = True thread.start() self.connected = True @@ -116,21 +116,21 @@ def is_socketcan_up(self) -> bool: if not self.linux: self._log.error("socketcan status not implemented for windows") return True - + try: with open(f'/sys/class/net/{self.port}/operstate', 'r') as f: state = f.read().strip() return state == 'up' except FileNotFoundError: return False - + def start_loop(self): self.read_bus(self.bus) def read_bus(self, bus : can.BusABC): ''' read canbus asynco and store results in cache''' msg = None #fix scope bug - + while True: try: msg = self.bus.recv() # This will be non-blocking with asyncio @@ -145,13 +145,13 @@ def read_bus(self, bus : can.BusABC): except Exception as e: # Handle unexpected errors self._log.error(f"An unexpected error occurred: {e}") - + if msg: self._log.info(f"Received message: {msg.arbitration_id:X}, data: {msg.data}") - - with self.lock: - #convert bytearray to bytes; were working with bytes. + + with self.lock: + #convert bytearray to bytes; were working with bytes. self.cache[msg.arbitration_id] = (bytes(msg.data), time.time()) #time.sleep(1) no need for sleep because recv is blocking @@ -159,27 +159,27 @@ def read_bus(self, bus : can.BusABC): def clean_cache(self): current_time = time.time() - + with self.lock: # Create a list of keys to remove (don't remove while iterating) keys_to_delete = [msg_id for msg_id, (_, timestamp) in self.cache.items() if current_time - timestamp > self.cacheTimeout] - + # Remove old messages from the dictionary for key in keys_to_delete: del self.cache[key] def init_after_connect(self): return True - + ''' todo, a startup phase to get serial number''' #from transport_base settings if self.write_enabled: self.enable_write() #if sn is empty, attempt to autoread it - if not self.device_serial_number: + if not self.device_serial_number: self.device_serial_number = self.read_serial_number() - + def read_serial_number(self) -> str: ''' not so simple in canbus''' return '' @@ -187,7 +187,7 @@ def read_serial_number(self) -> str: print("read SN: " +serial_number) if serial_number: return serial_number - + sn2 = "" sn3 = "" fields = ['Serial No 1', 'Serial No 2', 'Serial No 3', 'Serial No 4', 'Serial No 5'] @@ -200,19 +200,19 @@ def read_serial_number(self) -> str: if not hasattr(data, 'registers') or data.registers is None: self._log.critical("Failed to get serial number register ("+field+") ; exiting") exit() - + serial_number = serial_number + str(data.registers[0]) data_bytes = data.registers[0].to_bytes((data.registers[0].bit_length() + 7) // 8, byteorder='big') - sn2 = sn2 + str(data_bytes.decode('utf-8')) + sn2 = sn2 + str(data_bytes.decode('utf-8')) sn3 = str(data_bytes.decode('utf-8')) + sn3 time.sleep(self.modbus_delay*2) #sleep inbetween requests so modbus can rest - + print(sn2) print(sn3) - - if not re.search("[^a-zA-Z0-9\_]", sn2) : + + if not re.search(r"[^a-zA-Z0-9\_]", sn2) : serial_number = sn2 return serial_number @@ -236,7 +236,7 @@ def read_data(self) -> dict[str, str]: new_info = self.protocolSettings.process_registery(registry, self.protocolSettings.get_registry_map(Registry_Type.ZERO)) - info.update(new_info) + info.update(new_info) currentTime = time.time() @@ -255,13 +255,13 @@ def read_data(self) -> dict[str, str]: def read_variable(self, variable_name : str, registry_type : Registry_Type, entry : registry_map_entry = None): ''' read's variable from cache''' - ##clean for convinecne + ##clean for convinecne if variable_name: variable_name = variable_name.strip().lower().replace(' ', '_') registry_map = self.protocolSettings.get_registry_map(registry_type) - if entry == None: + if entry is None: for e in registry_map: if e.variable_name == variable_name: entry = e @@ -274,4 +274,4 @@ def read_variable(self, variable_name : str, registry_type : Registry_Type, entr results = self.protocolSettings.process_register_bytes(self.cache, entry) return results[entry.variable_name] else: - return None #empty \ No newline at end of file + return None #empty diff --git a/classes/transports/modbus_base.py b/classes/transports/modbus_base.py index 9ab9e3d..9b0839c 100644 --- a/classes/transports/modbus_base.py +++ b/classes/transports/modbus_base.py @@ -25,8 +25,8 @@ class modbus_base(transport_base): clients : dict[str, 'BaseModbusClient'] = {} ''' str is identifier, dict of clients when multiple transports use the same ports ''' - #non-static here for reference, type hinting, python bs ect... - modbus_delay_increament : float = 0.05 + #non-static here for reference, type hinting, python bs ect... + modbus_delay_increament : float = 0.05 ''' delay adjustment every error. todo: add a setting for this ''' modbus_delay_setting : float = 0.85 @@ -77,7 +77,7 @@ def init_after_connect(self): self.enable_write() #if sn is empty, attempt to autoread it - if not self.device_serial_number: + if not self.device_serial_number: self.device_serial_number = self.read_serial_number() self.update_identifier() @@ -85,13 +85,13 @@ def connect(self): if self.connected and self.first_connect: self.first_connect = False self.init_after_connect() - + def read_serial_number(self) -> str: serial_number = str(self.read_variable("Serial Number", Registry_Type.HOLDING)) self._log.info("read SN: " +serial_number) if serial_number: return serial_number - + sn2 = "" sn3 = "" fields = ['Serial No 1', 'Serial No 2', 'Serial No 3', 'Serial No 4', 'Serial No 5'] @@ -104,18 +104,18 @@ def read_serial_number(self) -> str: if not hasattr(data, 'registers') or data.registers is None: self._log.critical("Failed to get serial number register ("+field+") ; exiting") exit() - + serial_number = serial_number + str(data.registers[0]) data_bytes = data.registers[0].to_bytes((data.registers[0].bit_length() + 7) // 8, byteorder='big') - sn2 = sn2 + str(data_bytes.decode('utf-8')) + sn2 = sn2 + str(data_bytes.decode('utf-8')) sn3 = str(data_bytes.decode('utf-8')) + sn3 time.sleep(self.modbus_delay*2) #sleep inbetween requests so modbus can rest - + print(sn2) print(sn3) - + if not re.search("[^a-zA-Z0-9_]", sn2) : serial_number = sn2 @@ -164,7 +164,7 @@ def read_data(self) -> dict[str, str]: if False: new_info = {self.__input_register_prefix + key: value for key, value in new_info.items()} - info.update(new_info) + info.update(new_info) if not info: self._log.info("Register is Empty; transport busy?") @@ -175,7 +175,7 @@ def validate_protocol(self, protocolSettings : 'protocol_settings') -> float: score_percent = self.validate_registry(Registry_Type.HOLDING) return score_percent - + def validate_registry(self, registry_type : Registry_Type = Registry_Type.INPUT) -> float: score : float = 0 info = {} @@ -188,7 +188,7 @@ def validate_registry(self, registry_type : Registry_Type = Registry_Type.INPUT) if value.concatenate and value.register != value.concatenate_registers[0]: #only eval concated values once evaluate = False - + if evaluate: score = score + self.protocolSettings.validate_registry_entry(value, info[value.variable_name]) @@ -196,7 +196,7 @@ def validate_registry(self, registry_type : Registry_Type = Registry_Type.INPUT) percent = score*100/maxScore self._log.info("validation score: " + str(score) + " of " + str(maxScore) + " : " + str(round(percent)) + "%") return percent - + def analyze_protocol(self, settings_dir : str = 'protocols'): print("=== PROTOCOL ANALYZER ===") protocol_names : list[str] = [] @@ -265,7 +265,7 @@ def analyze_protocol(self, settings_dir : str = 'protocols'): #very well possible the registers will be incomplete due to different hardware sizes #so dont assume they are set / complete #we'll see about the behaviour. if it glitches, this could be a way to determine protocol. - + input_register_score : dict[str, int] = {} holding_register_score : dict[str, int] = {} @@ -283,8 +283,8 @@ def evaluate_score(entry : registry_map_entry, val): if entry.value_regex: #regex validation if re.match(entry.value_regex, val): - mod = mod * 2 - else: + mod = mod * 2 + else: mod = mod * -2 #regex validation failed, double damage! score = score + (2 * mod) #double points for ascii @@ -303,18 +303,18 @@ def evaluate_score(entry : registry_map_entry, val): return score - + for name, protocol in protocols.items(): input_register_score[name] = 0 holding_register_score[name] = 0 - #very rough percentage. tood calc max possible score. + #very rough percentage. tood calc max possible score. input_valid_count[name] = 0 holding_valid_count[name] = 0 #process registry based on protocol input_info = protocol.process_registery(input_registry, protocol.registry_map[Registry_Type.INPUT]) holding_info = protocol.process_registery(input_registry, protocol.registry_map[Registry_Type.HOLDING]) - + for entry in protocol.registry_map[Registry_Type.INPUT]: if entry.variable_name in input_info: @@ -336,7 +336,7 @@ def evaluate_score(entry : registry_map_entry, val): holding_register_score[name] = holding_register_score[name] + score - + protocol_scores: dict[str, int] = {} #combine scores for name, protocol in protocols.items(): @@ -347,23 +347,21 @@ def evaluate_score(entry : registry_map_entry, val): print("=== "+str(name)+" - "+str(protocol_scores[name])+" ===") print("input register score: " + str(input_register_score[name]) + "; valid registers: "+str(input_valid_count[name])+" of " + str(len(protocols[name].get_registry_map(Registry_Type.INPUT)))) print("holding register score : " + str(holding_register_score[name]) + "; valid registers: "+str(holding_valid_count[name])+" of " + str(len(protocols[name].get_registry_map(Registry_Type.HOLDING)))) - - + + def write_variable(self, entry : registry_map_entry, value : str, registry_type : Registry_Type = Registry_Type.HOLDING): """ writes a value to a ModBus register; todo: registry_type to handle other write functions""" #read current value current_registers = self.read_modbus_registers(start=entry.register, end=entry.register, registry_type=registry_type) - results = self.protocolSettings.process_registery(current_registers, self.protocolSettings.get_registry_map(registry_type)) current_value = current_registers[entry.register] - if not self.protocolSettings.validate_registry_entry(entry, current_value): - raise ValueError("Invalid value in register. unsafe to write") #i need to figure out a better error handler for theese. - + raise ValueError(f"Invalid value in register '{current_value}'. Unsafe to write") + if not self.protocolSettings.validate_registry_entry(entry, value): - raise ValueError("Invalid new value. unsafe to write") - + raise ValueError(f"Invalid new value, '{value}'. Unsafe to write") + #handle codes if entry.variable_name+"_codes" in self.protocolSettings.codes: codes = self.protocolSettings.codes[entry.variable_name+"_codes"] @@ -402,21 +400,21 @@ def write_variable(self, entry : registry_map_entry, value : str, registry_type raise ValueError("something went wrong bitwise") else: raise TypeError("Unsupported data type") - - if ushortValue == None: + + if ushortValue is None: raise ValueError("Invalid value - None") self.write_register(entry.register, ushortValue) def read_variable(self, variable_name : str, registry_type : Registry_Type, entry : registry_map_entry = None): - ##clean for convinecne + ##clean for convinecne if variable_name: variable_name = variable_name.strip().lower().replace(' ', '_') registry_map = self.protocolSettings.get_registry_map(registry_type) - if entry == None: + if entry is None: for e in registry_map: if e.variable_name == variable_name: entry = e @@ -431,18 +429,18 @@ def read_variable(self, variable_name : str, registry_type : Registry_Type, entr else: start = entry.register end = max(entry.concatenate_registers) - + registers = self.read_modbus_registers(start=start, end=end, registry_type=registry_type) results = self.protocolSettings.process_registery(registers, registry_map) return results[entry.variable_name] - + def read_modbus_registers(self, ranges : list[tuple] = None, start : int = 0, end : int = None, batch_size : int = 45, registry_type : Registry_Type = Registry_Type.INPUT ) -> dict: ''' maybe move this to transport_base ?''' if not ranges: #ranges is empty, use min max - if start == 0 and end == None: + if start == 0 and end is None: return {} #empty - + end = end + 1 ranges = [] start = start - batch_size @@ -468,7 +466,7 @@ def read_modbus_registers(self, ranges : list[tuple] = None, start : int = 0, en try: register = self.read_registers(range[0], range[1], registry_type=registry_type) - except ModbusIOException as e: + except ModbusIOException as e: self._log.error("ModbusIOException : ", e.error_code) if e.error_code == 4: #if no response; probably time out. retry with increased delay isError = True @@ -479,14 +477,14 @@ def read_modbus_registers(self, ranges : list[tuple] = None, start : int = 0, en if isinstance(register, bytes) or register.isError() or isError: #sometimes weird errors are handled incorrectly and response is a ascii error string if isinstance(register, bytes): self._log.error(register.decode('utf-8')) - else: + else: self._log.error(register.__str__) self.modbus_delay += self.modbus_delay_increament #increase delay, error is likely due to modbus being busy if self.modbus_delay > 60: #max delay. 60 seconds between requests should be way over kill if it happens self.modbus_delay = 60 - if retry > retries: #instead of none, attempt to continue to read. but with no retires. + if retry > retries: #instead of none, attempt to continue to read. but with no retires. continue else: #undo step in loop and retry read @@ -495,17 +493,17 @@ def read_modbus_registers(self, ranges : list[tuple] = None, start : int = 0, en self._log.warning("Retry("+str(retry)+" - ("+str(total_retries)+")) range("+str(index)+")") index = index - 1 continue - elif self.modbus_delay > self.modbus_delay_setting: #no error, decrease delay + elif self.modbus_delay > self.modbus_delay_setting: #no error, decrease delay self.modbus_delay -= self.modbus_delay_increament if self.modbus_delay < self.modbus_delay_setting: self.modbus_delay = self.modbus_delay_setting - - + + retry -= 1 if retry < 0: retry = 0 - + #combine registers into "registry" i = -1 while(i := i + 1 ) < range[1]: @@ -513,12 +511,12 @@ def read_modbus_registers(self, ranges : list[tuple] = None, start : int = 0, en registry[i+range[0]] = register.registers[i] return registry - + def read_registry(self, registry_type : Registry_Type = Registry_Type.INPUT) -> dict[str,str]: map = self.protocolSettings.get_registry_map(registry_type) if not map: return {} - + registry = self.read_modbus_registers(self.protocolSettings.get_registry_ranges(registry_type), registry_type=registry_type) info = self.protocolSettings.process_registery(registry, map) - return info \ No newline at end of file + return info diff --git a/classes/transports/modbus_rtu.py b/classes/transports/modbus_rtu.py index d79b808..561658c 100644 --- a/classes/transports/modbus_rtu.py +++ b/classes/transports/modbus_rtu.py @@ -1,4 +1,3 @@ -import logging from classes.protocol_settings import Registry_Type, protocol_settings import inspect @@ -18,25 +17,21 @@ class modbus_rtu(modbus_base): port : str = "/dev/ttyUSB0" addresses : list[int] = [] baudrate : int = 9600 - client : ModbusSerialClient + client : ModbusSerialClient pymodbus_slave_arg = 'unit' def __init__(self, settings : SectionProxy, protocolSettings : protocol_settings = None): - #logger = logging.getLogger(__name__) - #logging.basicConfig(level=logging.DEBUG) - super().__init__(settings, protocolSettings=protocolSettings) - self.port = settings.get("port", "") if not self.port: raise ValueError("Port is not set") - - self.port = find_usb_serial_port(self.port) + + self.port = find_usb_serial_port(self.port) if not self.port: raise ValueError("Port is not valid / not found") - + print("Serial Port : " + self.port + " = ", get_usb_serial_port_info(self.port)) #print for config convience if "baud" in self.protocolSettings.settings: @@ -46,7 +41,7 @@ def __init__(self, settings : SectionProxy, protocolSettings : protocol_settings address : int = settings.getint("address", 0) self.addresses = [address] - + # pymodbus compatability; unit was renamed to address if 'slave' in inspect.signature(ModbusSerialClient.read_holding_registers).parameters: self.pymodbus_slave_arg = 'slave' @@ -60,22 +55,22 @@ def __init__(self, settings : SectionProxy, protocolSettings : protocol_settings if client_str in modbus_base.clients: self.client = modbus_base.clients[client_str] return - + if 'method' in init_signature.parameters: - self.client = ModbusSerialClient(method='rtu', port=self.port, - baudrate=int(self.baudrate), + self.client = ModbusSerialClient(method='rtu', port=self.port, + baudrate=int(self.baudrate), stopbits=1, parity='N', bytesize=8, timeout=2 ) else: self.client = ModbusSerialClient( - port=self.port, - baudrate=int(self.baudrate), + port=self.port, + baudrate=int(self.baudrate), stopbits=1, parity='N', bytesize=8, timeout=2 ) - + #add to clients modbus_base.clients[client_str] = self.client - + def read_registers(self, start, count=1, registry_type : Registry_Type = Registry_Type.INPUT, **kwargs): if 'unit' not in kwargs: @@ -89,11 +84,11 @@ def read_registers(self, start, count=1, registry_type : Registry_Type = Registr return self.client.read_input_registers(address=start, count=count, **kwargs) elif registry_type == Registry_Type.HOLDING: return self.client.read_holding_registers(address=start, count=count, **kwargs) - + def write_register(self, register : int, value : int, **kwargs): if not self.write_enabled: - return - + return + if 'unit' not in kwargs: kwargs = {'unit': self.addresses[0], **kwargs} @@ -105,4 +100,4 @@ def write_register(self, register : int, value : int, **kwargs): def connect(self): self.connected = self.client.connect() - super().connect() \ No newline at end of file + super().connect() diff --git a/classes/transports/modbus_tcp.py b/classes/transports/modbus_tcp.py index bdf62d9..22ce4af 100644 --- a/classes/transports/modbus_tcp.py +++ b/classes/transports/modbus_tcp.py @@ -1,4 +1,3 @@ -import logging import inspect from classes.protocol_settings import Registry_Type, protocol_settings @@ -16,17 +15,14 @@ class modbus_tcp(modbus_base): port : str = 502 host : str = "" - client : ModbusTcpClient + client : ModbusTcpClient pymodbus_slave_arg = 'unit' def __init__(self, settings : SectionProxy, protocolSettings : protocol_settings = None): - #logger = logging.getLogger(__name__) - #logging.basicConfig(level=logging.DEBUG) - self.host = settings.get("host", "") if not self.host: raise ValueError("Host is not set") - + self.port = settings.getint("port", self.port) # pymodbus compatability; unit was renamed to address @@ -45,7 +41,7 @@ def __init__(self, settings : SectionProxy, protocolSettings : protocol_settings modbus_base.clients[client_str] = self.client super().__init__(settings, protocolSettings=protocolSettings) - + def read_registers(self, start, count=1, registry_type : Registry_Type = Registry_Type.INPUT, **kwargs): if 'unit' not in kwargs: @@ -59,7 +55,7 @@ def read_registers(self, start, count=1, registry_type : Registry_Type = Registr return self.client.read_input_registers(start, count, **kwargs ) elif registry_type == Registry_Type.HOLDING: return self.client.read_holding_registers(start, count, **kwargs) - + def connect(self): self.connected = self.client.connect() - super().connect() \ No newline at end of file + super().connect() diff --git a/classes/transports/modbus_tls.py b/classes/transports/modbus_tls.py index 2afeb0d..c6b464a 100644 --- a/classes/transports/modbus_tls.py +++ b/classes/transports/modbus_tls.py @@ -1,4 +1,3 @@ -import logging from classes.protocol_settings import Registry_Type, protocol_settings from pymodbus.client.sync import ModbusTlsClient from .transport_base import transport_base @@ -11,15 +10,12 @@ class modbus_udp(transport_base): hostname : str = "" ''' optional for cert ''' - + certfile : str = "" keyfile : str = "" - client : ModbusTlsClient + client : ModbusTlsClient def __init__(self, settings : SectionProxy, protocolSettings : protocol_settings = None): - #logger = logging.getLogger(__name__) - #logging.basicConfig(level=logging.DEBUG) - self.host = settings.get("host", "") if not self.host: raise ValueError("Host is not set") @@ -29,19 +25,19 @@ def __init__(self, settings : SectionProxy, protocolSettings : protocol_settings self.certfile = settings.get("certfile", "") if not self.certfile: raise ValueError("certfile is not set") - + self.keyfile = settings.get("keyfile", "") if not self.keyfile: raise ValueError("keyfile is not set") - + self.hostname = settings.get("hostname", self.host) - self.client = ModbusTlsClient(host=self.host, + self.client = ModbusTlsClient(host=self.host, hostname = self.hostname, certfile = self.certfile, keyfile = self.keyfile, - port=self.port, - timeout=7, + port=self.port, + timeout=7, retries=3) super().__init__(settings, protocolSettings=protocolSettings) @@ -53,4 +49,4 @@ def read_registers(self, start, count=1, registry_type : Registry_Type = Registr def connect(self): self.connected = self.client.connect() - super().connect() \ No newline at end of file + super().connect() diff --git a/classes/transports/modbus_udp.py b/classes/transports/modbus_udp.py index 9f4533b..a9f77b4 100644 --- a/classes/transports/modbus_udp.py +++ b/classes/transports/modbus_udp.py @@ -1,4 +1,3 @@ -import logging from classes.protocol_settings import Registry_Type, protocol_settings from pymodbus.client.sync import ModbusUdpClient from .transport_base import transport_base @@ -8,22 +7,19 @@ class modbus_udp(transport_base): port : int = 502 host : str = "" - client : ModbusUdpClient + client : ModbusUdpClient def __init__(self, settings : SectionProxy, protocolSettings : protocol_settings = None): - #logger = logging.getLogger(__name__) - #logging.basicConfig(level=logging.DEBUG) - self.host = settings.get("host", "") if not self.host: raise ValueError("Host is not set") - + self.port = settings.getint("port", self.port) self.client = ModbusUdpClient(host=self.host, port=self.port, timeout=7, retries=3) super().__init__(settings, protocolSettings=protocolSettings) - + def read_registers(self, start, count=1, registry_type : Registry_Type = Registry_Type.INPUT, **kwargs): if registry_type == Registry_Type.INPUT: return self.client.read_input_registers(start, count, **kwargs) @@ -32,4 +28,4 @@ def read_registers(self, start, count=1, registry_type : Registry_Type = Registr def connect(self): self.connected = self.client.connect() - super().connect() \ No newline at end of file + super().connect() diff --git a/classes/transports/mqtt.py b/classes/transports/mqtt.py index 5600aae..ae5bd17 100644 --- a/classes/transports/mqtt.py +++ b/classes/transports/mqtt.py @@ -1,8 +1,7 @@ import atexit -import logging import random import time -import json +import json import warnings import paho.mqtt.client @@ -27,13 +26,11 @@ class mqtt(transport_base): discovery_enabled : bool = False json : bool = False reconnect_delay : int = 7 - ''' seconds ''' + """ seconds """ reconnect_attempts : int = 21 - #max_precision : int = - 1 - holding_register_prefix : str = "" input_register_prefix : str = "" @@ -48,7 +45,7 @@ def __init__(self, settings : SectionProxy): self.host = settings.get('host', fallback="") if not self.host: raise ValueError("Host is not set") - + self.port = settings.getint('port', fallback=self.port) self.base_topic = settings.get('base_topic', fallback=self.base_topic).rstrip('/') self.error_topic = settings.get('error_topic', fallback=self.error_topic).rstrip('/') @@ -73,7 +70,7 @@ def __init__(self, settings : SectionProxy): if not username: raise ValueError("User is not set") - + if not password: warnings.warn("MQTT Password is empty", RuntimeWarning) @@ -95,7 +92,7 @@ def __init__(self, settings : SectionProxy): self.write_enabled = True #set default super().__init__(settings) - + def connect(self): self._log.info("mqtt connect") @@ -112,7 +109,7 @@ def exit_handler(self): self._log.warning("MQTT Exiting...") self.client.publish( self.base_topic + '/' + self.device_identifier + "/availability","offline") return - + def mqtt_reconnect(self): self._log.info("Disconnected from MQTT Broker!") if self.__reconnecting != 0: #stop double calls @@ -122,22 +119,22 @@ def mqtt_reconnect(self): self.__reconnecting = time.time() try: self._log.warning("Attempting to reconnect("+str(attempt)+")...") - if random.randint(0,1): #alternate between methods because built in reconnect might be unreliable. + if random.randint(0,1): #alternate between methods because built in reconnect might be unreliable. self.client.reconnect() else: self.client.loop_stop() self.client.connect(str(self.host), int(self.port), 60) self.client.loop_start() - #sleep to give a chance to reconnect. - time.sleep(self.reconnect_delay) + #sleep to give a chance to reconnect. + time.sleep(self.reconnect_delay) if self.connected: self.__reconnecting = 0 return - except: + except Exception: self._log.warning("Reconnection failed. Retrying in "+str(self.reconnect_delay)+" second(s)...") time.sleep(self.reconnect_delay) - + #failed to reonnect self._log.critical("Failed to Reconnect, Too many attempts") self.__reconnecting = 0 @@ -155,14 +152,14 @@ def on_connect(self, client, userdata, flags, rc): def write_data(self, data : dict[str, str], from_transport : transport_base): if not self.write_enabled: - return - + return + if self.connected: self.connected = self.client.is_connected() - - self._log.info(f"write data from [{from_transport.transport_name}] to mqtt transport") - self._log.info(data) - #have to send this every loop, because mqtt doesnt disconnect when HA restarts. HA bug. + + self._log.info(f"write data from [{from_transport.transport_name}] to mqtt transport") + self._log.info(data) + #have to send this every loop, because mqtt doesnt disconnect when HA restarts. HA bug. info = self.client.publish(self.base_topic + '/' + from_transport.device_identifier + "/availability","online", qos=0,retain=True) if info.rc == MQTT_ERR_NO_CONN: self.connected = False @@ -173,7 +170,7 @@ def write_data(self, data : dict[str, str], from_transport : transport_base): self.client.publish(self.base_topic+'/'+from_transport.device_identifier, json_object, 0, properties=self.mqtt_properties) else: for entry, val in data.items(): - if isinstance(val, float) and self.max_precision >= 0: #apply max_precision on mqtt transport + if isinstance(val, float) and self.max_precision >= 0: #apply max_precision on mqtt transport val = round(val, self.max_precision) self.client.publish(str(self.base_topic+'/'+from_transport.device_identifier+'/'+entry).lower(), str(val)) @@ -189,7 +186,7 @@ def client_on_message(self, client, userdata, msg): #self.write_variable(entry, value=str(msg.payload.decode('utf-8'))) def init_bridge(self, from_transport : transport_base): - + if from_transport.write_enabled: self.__write_topics = {} #subscribe to write topics @@ -217,7 +214,7 @@ def mqtt_discovery(self, from_transport : transport_base): registry_map : list[registry_map_entry] = [] for entries in from_transport.protocolSettings.registry_map.values(): - registry_map.extend(entries) + registry_map.extend(entries) length = len(registry_map) count = 0 @@ -226,7 +223,7 @@ def mqtt_discovery(self, from_transport : transport_base): if item.concatenate and item.register != item.concatenate_registers[0]: continue #skip all except the first register so no duplicates - + if item.write_mode == WriteMode.READDISABLED: #disabled continue @@ -254,25 +251,25 @@ def mqtt_discovery(self, from_transport : transport_base): writePrefix = "" if from_transport.write_enabled and ( item.write_mode == WriteMode.WRITE or item.write_mode == WriteMode.WRITEONLY ): - writePrefix = "" #home assistant doesnt like write prefix + writePrefix = "" #home assistant doesnt like write prefix disc_payload['state_topic'] = self.base_topic + '/' +from_transport.device_identifier + writePrefix+ "/"+clean_name - + if item.unit: disc_payload['unit_of_measurement'] = item.unit discovery_topic = self.discovery_topic+"/sensor/HN-" + from_transport.device_serial_number + writePrefix + "/" + disc_payload['name'].replace(' ', '_') + "/config" - + self.client.publish(discovery_topic, json.dumps(disc_payload),qos=1, retain=True) - + #send WO message to indicate topic is write only if item.write_mode == WriteMode.WRITEONLY: self.client.publish(disc_payload['state_topic'], "WRITEONLY") - + time.sleep(0.07) #slow down for better reliability - + self.client.publish(disc_payload['availability_topic'],"online",qos=0, retain=True) print() - self._log.info("Published HA "+str(count)+"x Discovery Topics") \ No newline at end of file + self._log.info("Published HA "+str(count)+"x Discovery Topics") diff --git a/classes/transports/pace.py b/classes/transports/pace.py index 0bc0c48..6e28885 100644 --- a/classes/transports/pace.py +++ b/classes/transports/pace.py @@ -3,19 +3,15 @@ import logging from classes.protocol_settings import Registry_Type from pymodbus.client.sync import ModbusSerialClient, BaseModbusClient -from pymodbus.transaction import ModbusRtuFramer +from pymodbus.transaction import ModbusRtuFramer from pymodbus.factory import ClientDecoder from pymodbus.constants import Defaults -from pymodbus.exceptions import ModbusIOException -from pymodbus.exceptions import InvalidMessageReceivedException from pymodbus.utilities import checkCRC, computeCRC -from pymodbus.utilities import hexlify_packets, ModbusTransactionState from pymodbus.compat import byte2int -from pymodbus.framer import ModbusFramer, FRAME_HEADER, BYTE_ORDER +from pymodbus.framer import FRAME_HEADER, BYTE_ORDER -import logging _logger = logging.getLogger(__name__) RTU_FRAME_HEADER = BYTE_ORDER + FRAME_HEADER @@ -112,7 +108,7 @@ def calculate_crc(puchMsg, usDataLen): def calcCRC_3(data, size): crcHi = 0XFF crcLo = 0xFF - + crcHiTable = [ 0x00, 0xC1, 0x81, 0x40, 0x01, 0xC0, 0x80, 0x41, 0x01, 0xC0, 0x80, 0x41, 0x00, 0xC1, 0x81, 0x40, 0x01, 0xC0, 0x80, 0x41, 0x00, 0xC1, 0x81, 0x40, 0x00, 0xC1, 0x81, 0x40, 0x01, 0xC0, @@ -190,10 +186,10 @@ def buildPacket(self, message): packet = struct.pack(RTU_FRAME_HEADER, message.unit_id, 0x03) + data - - size = len(packet) + crc = computeCRC(packet) - crc2 = calculate_crc(packet,size) +# size = len(packet) +# crc2 = calculate_crc(packet, size) #packet struct: #slave address - 0x01 - 0x10 @@ -206,7 +202,7 @@ def buildPacket(self, message): #crc hi #error 0x4 = bad CRC. so standard CRC is correct - + #crc16 = (modbusdata[bufferIndex] * 0x0100) + modbusdata[bufferIndex + 1] #metCRC16 = self.calcCRC16(modbusdata, bufferIndex) @@ -215,7 +211,7 @@ def buildPacket(self, message): message.transaction_id = message.unit_id # Ensure that transaction is actually the unit id for serial comms return packet - + def checkFrame(self): """ Check if the next frame is available. @@ -239,8 +235,8 @@ def checkFrame(self): return False except (IndexError, KeyError, struct.error): return False - -class CustomModbusSerialClient(ModbusSerialClient): + +class CustomModbusSerialClient(ModbusSerialClient): def __init__(self, method='ascii', **kwargs): """ Initialize a serial client instance @@ -297,20 +293,17 @@ class pace: port : str = "/dev/ttyUSB0" baudrate : int = 9600 - client : CustomModbusSerialClient + client : CustomModbusSerialClient def __init__(self, settings : dict[str,str]): - logger = logging.getLogger(__name__) - logging.basicConfig(level=logging.DEBUG) - if "port" in settings: self.port = settings["port"] if "baudrate" in settings: self.baudrate = settings["baudrate"] - self.client = CustomModbusSerialClient(method='binary', port=self.port, - baudrate=int(self.baudrate), + self.client = CustomModbusSerialClient(method='binary', port=self.port, + baudrate=int(self.baudrate), stopbits=1, parity='N', bytesize=8, timeout=2 ) @@ -319,8 +312,8 @@ def read_registers(self, start, count=1, registry_type : Registry_Type = Registr return self.client.read_input_registers(start, count, **kwargs) elif registry_type == Registry_Type.HOLDING: return self.client.read_holding_registers(start, count, **kwargs) - + time.sleep(4) def connect(self): - self.client.connect() \ No newline at end of file + self.client.connect() diff --git a/classes/transports/serial_frame_client.py b/classes/transports/serial_frame_client.py index 30aec00..6778075 100644 --- a/classes/transports/serial_frame_client.py +++ b/classes/transports/serial_frame_client.py @@ -63,11 +63,11 @@ def read(self, reset_buffer = True, frames = 1) -> list[bytes] | bytes: buffer = bytearray() self.pending_frames.clear() - #for shatty order sensitive protocols. + #for shatty order sensitive protocols. # Clear input buffers if reset_buffer: self.client.reset_input_buffer() - + timedout = time.time() + self.timeout self.client.timeout = self.timeout frameCount = 0 @@ -93,11 +93,11 @@ def read(self, reset_buffer = True, frames = 1) -> list[bytes] | bytes: eoi_index = buffer.find(self.eoi) if eoi_index != -1: - + frame = buffer[len(self.soi):eoi_index] if frames == 1: return frame - + if frameCount > 1: # Extract and store the complete frame self.pending_frames.append(frame) @@ -110,7 +110,7 @@ def read(self, reset_buffer = True, frames = 1) -> list[bytes] | bytes: # Find next SOI index in the remaining buffer soi_index = buffer.find(self.soi) - + else: # If no EOI is found and buffer size exceeds max_frame_size, clear buffer if len(buffer) > self.max_frame_size: @@ -164,5 +164,5 @@ def read_thread(self): self.on_message(frame) time.sleep(0.01) - - + + diff --git a/classes/transports/serial_pylon.py b/classes/transports/serial_pylon.py index b4f6ee0..fcc309f 100644 --- a/classes/transports/serial_pylon.py +++ b/classes/transports/serial_pylon.py @@ -47,7 +47,7 @@ class serial_pylon(transport_base): #this format is pretty common; i need a name for it. SOI : bytes = b'\x7e' # aka b"~" - VER : bytes = b'\x00' + VER : bytes = b'\x00' ''' version has to be fetched first ''' ADR : bytes CID1 : bytes @@ -64,8 +64,8 @@ def __init__(self, settings : 'SectionProxy', protocolSettings : 'protocol_setti self.port = settings.get("port", "") if not self.port: raise ValueError("Port is not set") - - self.port = find_usb_serial_port(self.port) + + self.port = find_usb_serial_port(self.port) print("Serial Port : " + self.port + " = "+get_usb_serial_port_info(self.port)) #print for config convience self.baudrate = settings.getint("baudrate", 9600) @@ -76,10 +76,10 @@ def __init__(self, settings : 'SectionProxy', protocolSettings : 'protocol_setti self.ADR = struct.pack('B', address) #todo, multi address support later - self.client = serial_frame_client(self.port, - self.baudrate, - self.SOI, - self.EOI, + self.client = serial_frame_client(self.port, + self.baudrate, + self.SOI, + self.EOI, bytesize=8, parity=serial.PARITY_NONE, stopbits=1, exclusive=True) @@ -97,7 +97,7 @@ def connect(self): self._log.info("pylon protocol version is "+str(version)) self.VER = version - name = self.read_variable('battery_name') + name = self.read_variable('battery_name') self._log.info(name) pass @@ -105,13 +105,13 @@ def read_data(self): info = {} registry_map = self.protocolSettings.get_registry_map() - + data : dict [int, bytes] = {} for entry in registry_map: - + if entry.register not in data: #todo: need to check send data. later. command = entry.register #CID1 and CID2 combined creates a single ushort - self.send_command(command) + self.send_command(command) frame = self.client.read() if frame: #decode info to ascii: bytes.fromhex(name.decode("utf-8")).decode("ascii") raw = getattr(self.decode_frame(frame), 'info') @@ -127,23 +127,23 @@ def read_data(self): return info def read_variable(self, variable_name : str, entry : 'registry_map_entry' = None, attribute : str = 'info'): - ##clean for convinecne + ##clean for convinecne if variable_name: variable_name = variable_name.strip().lower().replace(' ', '_') registry_map = self.protocolSettings.get_registry_map() - if entry == None: + if entry is None: for e in registry_map: if e.variable_name == variable_name: entry = e break - + if entry: #entry.concatenate this protocol probably doesnt require concatenate, since info is variable length. command = entry.register #CID1 and CID2 combined creates a single ushort - self.send_command(command) + self.send_command(command) frame = self.client.read() if frame: #decode info to ascii: bytes.fromhex(name.decode("utf-8")).decode("ascii") raw = getattr(self.decode_frame(frame), attribute) @@ -169,7 +169,6 @@ def calculate_checksum(self, data): return checksum def decode_frame(self, raw_frame: bytes) -> bytes: - b4 = raw_frame raw_frame = bytes(raw_frame) frame_data = raw_frame[0:-4] @@ -196,7 +195,7 @@ def decode_frame(self, raw_frame: bytes) -> bytes: #todo, process info return data - + def build_frame(self, command : int, info: bytes = b''): ''' builds frame without soi and eoi; that is left for frame client''' @@ -210,7 +209,7 @@ def build_frame(self, command : int, info: bytes = b''): info_length = (lenid_invert_plus_one << 12) + lenid - + self.VER = b'\x20' #protocol is in ASCII hex. :facepalm: @@ -229,10 +228,9 @@ def build_frame(self, command : int, info: bytes = b''): #self.decode_frame(frame) return frame - - + + def send_command(self, cmd, info: bytes = b''): data = self.build_frame(cmd, info) self.client.write(data) - \ No newline at end of file diff --git a/classes/transports/transport_base.py b/classes/transports/transport_base.py index 287415c..adeb282 100644 --- a/classes/transports/transport_base.py +++ b/classes/transports/transport_base.py @@ -33,7 +33,7 @@ class transport_base: _log : logging.Logger = None def __init__(self, settings : 'SectionProxy') -> None: - + self.transport_name = settings.name #section name #apply log level to logger @@ -42,8 +42,8 @@ def __init__(self, settings : 'SectionProxy') -> None: self._log : logging.Logger = logging.getLogger(short_name + f"[{self.transport_name}]") self._log.setLevel(self._log_level) - - self.type = self.__class__.__name__ + + self.type = self.__class__.__name__ if settings: self.device_serial_number = settings.get(["device_serial_number", "serial_number"], self.device_serial_number) @@ -57,9 +57,8 @@ def __init__(self, settings : 'SectionProxy') -> None: else: self.write_enabled = settings.getboolean("write", self.write_enabled) - #load a protocol_settings class for every transport; required for adv features. ie, variable timing. - #must load after settings + #must load after settings self.protocol_version = settings.get('protocol_version') if self.protocol_version: self.protocolSettings = protocol_settings(self.protocol_version, transport_settings=settings) @@ -68,7 +67,7 @@ def __init__(self, settings : 'SectionProxy') -> None: self.protocol_version = self.protocolSettings.protocol #todo, reimplement default settings from protocolsettings - + self.update_identifier() @@ -94,8 +93,10 @@ def write_data(self, data : dict[str, registry_map_entry], from_transport : 'tra #lets convert this to dict[str, registry_map_entry] def read_data(self) -> dict[str,str]: - ''' general purpose read function for between transports; - return type may be changed to dict[str, registrsy_map_entry]. still thinking about this''' + ''' + general purpose read function for between transports; + return type may be changed to dict[str, registrsy_map_entry]. still thinking about this + ''' pass @@ -118,4 +119,4 @@ def analyse_protocol(self): def validate_protocol(self, protocolSettings : 'protocol_settings') -> float: ''' validates protocol''' pass - #endregion \ No newline at end of file + #endregion diff --git a/defs/common.py b/defs/common.py index 8cedea5..a28c371 100644 --- a/defs/common.py +++ b/defs/common.py @@ -11,15 +11,15 @@ def strtobool (val): val = val.lower() if val in ('y', 'yes', 't', 'true', 'on', '1'): return 1 - + return 0 def strtoint(val : str) -> int: ''' converts str to int, but allows for hex string input, identified by x prefix''' - if isinstance(val, int): #is already int. + if isinstance(val, int): #is already int. return val - + val = val.lower().strip() if val and val[0] == 'x': @@ -29,7 +29,7 @@ def strtoint(val : str) -> int: val = '0' + val return int.from_bytes(bytes.fromhex(val), byteorder='big') - + if val and val.startswith("0x"): val = val[2:] # Pad the string with a leading zero @@ -37,25 +37,25 @@ def strtoint(val : str) -> int: val = '0' + val return int.from_bytes(bytes.fromhex(val), byteorder='big') - + if not val: #empty return 0 - + return int(val) def get_usb_serial_port_info(port : str = '') -> str: for p in serial.tools.list_ports.comports(): if str(p.device).upper() == port.upper(): return "["+hex(p.vid)+":"+hex(p.pid)+":"+str(p.serial_number)+":"+str(p.location)+"]" - + return "" def find_usb_serial_port(port : str = '', vendor_id : str = '', product_id : str = '', serial_number : str = '', location : str = '') -> str: if not port.startswith('['): return port - + port = port.replace('None', '') - + match = re.match(r"\[(?P[\da-zA-Z]+|):?(?P[\da-zA-Z]+|):?(?P[\da-zA-Z]+|):?(?P[\d\-]+|)\]", port) if match: vendor_id = int(match.group("vendor"), 16) if match.group("vendor") else '' @@ -72,5 +72,5 @@ def find_usb_serial_port(port : str = '', vendor_id : str = '', product_id : st else: print("Bad Port Pattern", port) return None - - return None \ No newline at end of file + + return None diff --git a/documentation/.scripts/generate_indexes.py b/documentation/.scripts/generate_indexes.py index 159e742..a9214a0 100644 --- a/documentation/.scripts/generate_indexes.py +++ b/documentation/.scripts/generate_indexes.py @@ -24,7 +24,7 @@ def generate_readme(directory : str, folder_order : str = [], output_file : str with open(note_file, "r", encoding="utf-8") as note: readme.write(note.read()) - + previous_folder = "" folder_lines : dict[str, list[str]] = {} @@ -40,7 +40,7 @@ def generate_readme(directory : str, folder_order : str = [], output_file : str # Create a bold header for each new folder folder_lines[relative_folder] = [] folder_lines[relative_folder].append(f"**{relative_folder}**\n\n") - + previous_folder = relative_folder #generate index in folder @@ -49,7 +49,7 @@ def generate_readme(directory : str, folder_order : str = [], output_file : str for file in files: file_path = os.path.relpath(os.path.join(root, file), directory).replace("\\", "/") #use linux path structure file_path = urllib.parse.quote(file_path) - + if file == "README.md": #skip continue @@ -91,4 +91,4 @@ def generate_readme(directory : str, folder_order : str = [], output_file : str # Specify the directory you want to index directory_to_index = "../" - generate_readme(directory_to_index, ["3rdparty", "3rdparty/protocols"]) \ No newline at end of file + generate_readme(directory_to_index, ["3rdparty", "3rdparty/protocols"]) diff --git a/protocol_gateway.py b/protocol_gateway.py index 32ab131..1a4590a 100644 --- a/protocol_gateway.py +++ b/protocol_gateway.py @@ -20,34 +20,33 @@ import argparse -import atexit import os import logging import sys import traceback from configparser import ConfigParser, NoOptionError -from classes.protocol_settings import protocol_settings,Data_Type,registry_map_entry,Registry_Type,WriteMode +from classes.protocol_settings import protocol_settings,registry_map_entry from classes.transports.transport_base import transport_base __logo = """ -██████╗ ██╗ ██╗████████╗██╗ ██╗ ██████╗ ███╗ ██╗ -██╔══██╗╚██╗ ██╔╝╚══██╔══╝██║ ██║██╔═══██╗████╗ ██║ -██████╔╝ ╚████╔╝ ██║ ███████║██║ ██║██╔██╗ ██║ -██╔═══╝ ╚██╔╝ ██║ ██╔══██║██║ ██║██║╚██╗██║ -██║ ██║ ██║ ██║ ██║╚██████╔╝██║ ╚████║ -╚═╝ ╚═╝ ╚═╝ ╚═╝ ╚═╝ ╚═════╝ ╚═╝ ╚═══╝ - +██████╗ ██╗ ██╗████████╗██╗ ██╗ ██████╗ ███╗ ██╗ +██╔══██╗╚██╗ ██╔╝╚══██╔══╝██║ ██║██╔═══██╗████╗ ██║ +██████╔╝ ╚████╔╝ ██║ ███████║██║ ██║██╔██╗ ██║ +██╔═══╝ ╚██╔╝ ██║ ██╔══██║██║ ██║██║╚██╗██║ +██║ ██║ ██║ ██║ ██║╚██████╔╝██║ ╚████║ +╚═╝ ╚═╝ ╚═╝ ╚═╝ ╚═╝ ╚═════╝ ╚═╝ ╚═══╝ + ██████╗ ██████╗ ██████╗ ████████╗ ██████╗ ██████╗ ██████╗ ██╗ ██████╗ █████╗ ████████╗███████╗██╗ ██╗ █████╗ ██╗ ██╗ ██╔══██╗██╔══██╗██╔═══██╗╚══██╔══╝██╔═══██╗██╔════╝██╔═══██╗██║ ██╔════╝ ██╔══██╗╚══██╔══╝██╔════╝██║ ██║██╔══██╗╚██╗ ██╔╝ -██████╔╝██████╔╝██║ ██║ ██║ ██║ ██║██║ ██║ ██║██║ ██║ ███╗███████║ ██║ █████╗ ██║ █╗ ██║███████║ ╚████╔╝ -██╔═══╝ ██╔══██╗██║ ██║ ██║ ██║ ██║██║ ██║ ██║██║ ██║ ██║██╔══██║ ██║ ██╔══╝ ██║███╗██║██╔══██║ ╚██╔╝ -██║ ██║ ██║╚██████╔╝ ██║ ╚██████╔╝╚██████╗╚██████╔╝███████╗ ╚██████╔╝██║ ██║ ██║ ███████╗╚███╔███╔╝██║ ██║ ██║ -╚═╝ ╚═╝ ╚═╝ ╚═════╝ ╚═╝ ╚═════╝ ╚═════╝ ╚═════╝ ╚══════╝ ╚═════╝ ╚═╝ ╚═╝ ╚═╝ ╚══════╝ ╚══╝╚══╝ ╚═╝ ╚═╝ ╚═╝ - -""" +██████╔╝██████╔╝██║ ██║ ██║ ██║ ██║██║ ██║ ██║██║ ██║ ███╗███████║ ██║ █████╗ ██║ █╗ ██║███████║ ╚████╔╝ +██╔═══╝ ██╔══██╗██║ ██║ ██║ ██║ ██║██║ ██║ ██║██║ ██║ ██║██╔══██║ ██║ ██╔══╝ ██║███╗██║██╔══██║ ╚██╔╝ +██║ ██║ ██║╚██████╔╝ ██║ ╚██████╔╝╚██████╗╚██████╔╝███████╗ ╚██████╔╝██║ ██║ ██║ ███████╗╚███╔███╔╝██║ ██║ ██║ +╚═╝ ╚═╝ ╚═╝ ╚═════╝ ╚═╝ ╚═════╝ ╚═════╝ ╚═════╝ ╚══════╝ ╚═════╝ ╚═╝ ╚═╝ ╚═╝ ╚══════╝ ╚══╝╚══╝ ╚═╝ ╚═╝ ╚═╝ + +""" # noqa: W291 class CustomConfigParser(ConfigParser): @@ -58,7 +57,7 @@ def get(self, section, option, *args, **kwargs): if 'fallback' in kwargs: #override kwargs fallback, for manually handling here fallback = kwargs['fallback'] kwargs['fallback'] = None - + for name in option: value = super().get(section, name, *args, **kwargs) if value: @@ -67,14 +66,14 @@ def get(self, section, option, *args, **kwargs): if not value: value = fallback - if value == None: + if value is None: raise NoOptionError(option[0], section) else: value = super().get(section, option, *args, **kwargs) if isinstance(value, int): return value - + return value.strip() if value is not None else value class Protocol_Gateway: @@ -131,14 +130,14 @@ def __init__(self, config_file : str): if not transport_type and not protocol_version: raise ValueError('Missing Transport / Protocol Version') - - + if not transport_type and protocol_version: #get transport from protocol settings... todo need to make a quick function instead of this + protocolSettings : protocol_settings = protocol_settings(protocol_version) if not transport_type and not protocolSettings.transport: raise ValueError('Missing Transport') - + if not transport_type: transport_type = protocolSettings.transport @@ -194,26 +193,26 @@ def run(self): if not transport.connected: transport.connect() #reconnect else: #transport is connected - + info = transport.read_data() if not info: continue - + #todo. broadcast option if transport.bridge: for to_transport in self.__transports: if to_transport.transport_name == transport.bridge: to_transport.write_data(info, transport) break - + except Exception as err: traceback.print_exc() self.__log.error(err) - time.sleep(0.7) #change this in future. probably reduce to allow faster reads. + time.sleep(0.7) #change this in future. probably reduce to allow faster reads. + - diff --git a/pytests/test_example_config.py b/pytests/test_example_config.py index 37bf4b3..3f36f27 100644 --- a/pytests/test_example_config.py +++ b/pytests/test_example_config.py @@ -1,12 +1,11 @@ import sys import os -import pytest #move up a folder for tests -sys.path.insert(0, os.path.abspath(os.path.join(os.path.dirname(__file__), '..'))) +sys.path.insert(0, os.path.abspath(os.path.join(os.path.dirname(__file__), '..'))) from protocol_gateway import CustomConfigParser def test_example_cfg(): parser = CustomConfigParser() - parser.read("config.cfg.example") \ No newline at end of file + parser.read("config.cfg.example") diff --git a/pytests/test_protocol_settings.py b/pytests/test_protocol_settings.py index dba99ac..c682432 100644 --- a/pytests/test_protocol_settings.py +++ b/pytests/test_protocol_settings.py @@ -5,7 +5,7 @@ #move up a folder for tests -sys.path.insert(0, os.path.abspath(os.path.join(os.path.dirname(__file__), '..'))) +sys.path.insert(0, os.path.abspath(os.path.join(os.path.dirname(__file__), '..'))) from classes.protocol_settings import protocol_settings @@ -22,5 +22,4 @@ @pytest.mark.parametrize("protocol", protocols) def test_protocol_setting(protocol : str): print(protocol) - protocolSettings : protocol_settings = protocol_settings(protocol) - + protocolSettings : protocol_settings = protocol_settings(protocol) # noqa: F841 diff --git a/requirements-dev.txt b/requirements-dev.txt new file mode 100644 index 0000000..af3ee57 --- /dev/null +++ b/requirements-dev.txt @@ -0,0 +1 @@ +ruff diff --git a/requirements.txt b/requirements.txt index 40e2ac2..2158c18 100644 --- a/requirements.txt +++ b/requirements.txt @@ -1,4 +1,4 @@ pymodbus==3.7.0 paho-mqtt pyserial -python-can \ No newline at end of file +python-can diff --git a/ruff.toml b/ruff.toml new file mode 100644 index 0000000..ddf2766 --- /dev/null +++ b/ruff.toml @@ -0,0 +1,22 @@ +line-length = 88 +indent-width = 4 +target-version = "py311" + +[lint] +# Enable Pyflakes (`F`) and a subset of the pycodestyle (`E`) codes by default. +# Unlike Flake8, Ruff doesn't enable pycodestyle warnings (`W`) or +# McCabe complexity (`C901`) by default. + +select = ["E", "F", "W"] +ignore = ["E501"] + +#select = ["ALL"] +#ignore = ["C901", "EM101", "TRY003", "E501", "D", "G004", "N", "S105", +# "DTZ007", "S301", "RUF013", "PTH123", "PLW0603", "TRY400", "BLE001", +# "PLR0912", "PLR0915", "FBT001", "FBT002", "FA100", "W191" +#] + +[format] +quote-style = "double" +indent-style = "space" +line-ending = "auto" diff --git a/test.py b/test.py index 1ed177f..c383d4c 100644 --- a/test.py +++ b/test.py @@ -1,3 +1,5 @@ +import re +import ast #pip install "python-can[gs_usb]" @@ -40,7 +42,7 @@ # The data is a 2-byte value (un16) soc_bytes = msg.data[:2] soc = int.from_bytes(soc_bytes, byteorder='big', signed=False) / 100.0 - + print(f"State of Charge: {soc:.2f}%") if msg.arbitration_id == 0x0355: @@ -61,10 +63,6 @@ quit() -import re -import ast - - # Define the register string register = "x4642.[ 1 + ((( [battery 1 number of cells] *2 )+ (1~[battery 1 number of temperature] *2)) ) ]" @@ -115,7 +113,7 @@ def evaluate_ranges(expression): return results -def evaluate_expression(expression): +def evaluate_expression(expression): # Define a regular expression pattern to match "maths" var_pattern = re.compile(r'\[(?P.*?)\]') @@ -124,15 +122,15 @@ def replace_vars(match): try: maths = match.group("maths") maths = re.sub(r'\s', '', maths) #remove spaces, because ast.parse doesnt like them - + # Parse the expression safely tree = ast.parse(maths, mode='eval') # Evaluate the expression end_value = eval(compile(tree, filename='', mode='eval')) - + return str(end_value) - except : + except Exception: return match.group(0) # Replace variables with their values diff --git a/tools/apply_common_names_to_csv.py b/tools/apply_common_names_to_csv.py index f7e2141..17535a9 100644 --- a/tools/apply_common_names_to_csv.py +++ b/tools/apply_common_names_to_csv.py @@ -28,7 +28,7 @@ continue if not row['variable name'].strip(): #if empty, apply name - row['variable name'] = common_names[row['documented name']] + row['variable name'] = common_names[row['documented name']] print(row['documented name'] + ' -> ' + common_names[row['documented name']]) new_csv.append(row) continue @@ -37,7 +37,7 @@ # Write data to the output CSV with open(save_path, 'w', newline='') as outfile: writer = csv.DictWriter(outfile, fieldnames=fieldnames) - writer.writeheader() - writer.writerows(new_csv) + writer.writeheader() + writer.writerows(new_csv) -print("saved to "+save_path) \ No newline at end of file +print("saved to "+save_path) diff --git a/tools/get_common_names_from_csv.py b/tools/get_common_names_from_csv.py index b96cbef..f2ba72e 100644 --- a/tools/get_common_names_from_csv.py +++ b/tools/get_common_names_from_csv.py @@ -17,7 +17,7 @@ with open(path, newline='') as csvfile: # Create a CSV reader object reader = csv.DictReader(csvfile, delimiter=';') #compensate for openoffice - + # Iterate over each row in the CSV file for row in reader: if row['variable name'] and row['documented name']: @@ -37,4 +37,4 @@ with open(save_path, 'w') as file: file.write(json_str) -print("saved to "+save_path) \ No newline at end of file +print("saved to "+save_path) diff --git a/tools/list_to_json.py b/tools/list_to_json.py index 38eed52..edd10e2 100644 --- a/tools/list_to_json.py +++ b/tools/list_to_json.py @@ -19,7 +19,7 @@ pairs = user_input.split("=") else: pairs = user_input.split() - + # Create a dictionary from the key-value pairs result = {} @@ -34,7 +34,7 @@ else: key, value = pair.split(":") - + result[key.strip()] = value.strip() # Convert the dictionary to JSON