From 7221f39c3791331ad553cc6b5d86bef3c6b9ad06 Mon Sep 17 00:00:00 2001 From: semuadmin <28569967+semuadmin@users.noreply.github.com> Date: Mon, 20 May 2024 21:48:24 +0100 Subject: [PATCH] minor performance tweaks --- src/pyubx2/ubxmessage.py | 168 ++++++++++++++++++++------------------- src/pyubx2/ubxreader.py | 8 -- 2 files changed, 88 insertions(+), 88 deletions(-) diff --git a/src/pyubx2/ubxmessage.py b/src/pyubx2/ubxmessage.py index f3aa27f..f4a72f0 100644 --- a/src/pyubx2/ubxmessage.py +++ b/src/pyubx2/ubxmessage.py @@ -14,12 +14,7 @@ import struct -import pyubx2.exceptions as ube -import pyubx2.ubxtypes_core as ubt -import pyubx2.ubxtypes_get as ubg -import pyubx2.ubxtypes_poll as ubp -import pyubx2.ubxtypes_set as ubs -import pyubx2.ubxvariants as umv +from pyubx2.exceptions import UBXMessageError, UBXTypeError from pyubx2.ubxhelpers import ( attsiz, bytes2val, @@ -34,6 +29,29 @@ nomval, val2bytes, ) +from pyubx2.ubxtypes_core import ( + CH, + GET, + POLL, + SCALROUND, + SET, + U1, + U2, + U4, + UBX_CLASSES, + UBX_HDR, + UBX_MSGIDS, + X1, + X2, + X4, + X6, + X8, + X24, +) +from pyubx2.ubxtypes_get import UBX_PAYLOADS_GET +from pyubx2.ubxtypes_poll import UBX_PAYLOADS_POLL +from pyubx2.ubxtypes_set import UBX_PAYLOADS_SET +from pyubx2.ubxvariants import VARIANTS class UBXMessage: @@ -45,7 +63,6 @@ def __init__( ubxID, msgmode: int, parsebitfield: bool = True, - scaling: bool = True, **kwargs, ): """Constructor. @@ -62,7 +79,6 @@ def __init__( :param object msgID: message ID as str, int or byte :param int msgmode: message mode (0=GET, 1=SET, 2=POLL) :param bool parsebitfield: parse bitfields ('X' type attributes) Y/N - :param bool scaling: apply scale factors Y/N :param kwargs: optional payload keyword arguments :raises: UBXMessageError """ @@ -73,12 +89,10 @@ def __init__( self._payload = b"" self._length = b"" self._checksum = b"" - self._parsebf = parsebitfield # parsing bitfields Y/N? - self._scaling = scaling # apply scale factors Y/N? - if msgmode not in (ubt.GET, ubt.SET, ubt.POLL): - raise ube.UBXMessageError(f"Invalid msgmode {msgmode} - must be 0, 1 or 2") + if msgmode not in (GET, SET, POLL): + raise UBXMessageError(f"Invalid msgmode {msgmode} - must be 0, 1 or 2") # accommodate different formats of msgClass and msgID if isinstance(ubxClass, str) and isinstance( @@ -126,7 +140,7 @@ def _do_attributes(self, **kwargs): TypeError, ValueError, ) as err: - raise ube.UBXTypeError( + raise UBXTypeError( ( f"Incorrect type for attribute '{anam}' " f"in {['GET', 'SET', 'POLL'][self._mode]} message " @@ -134,7 +148,7 @@ def _do_attributes(self, **kwargs): ) ) from err except (OverflowError,) as err: - raise ube.UBXTypeError( + raise UBXTypeError( ( f"Overflow error for attribute '{anam}' " f"in {['GET', 'SET', 'POLL'][self._mode]} message " @@ -163,7 +177,7 @@ def _set_attribute( adef, tuple ): # repeating group of attributes or subdefined bitfield numr, _ = adef - if numr in (ubt.X1, ubt.X2, ubt.X4, ubt.X6, ubt.X8, ubt.X24): # bitfield + if numr in (X1, X2, X4, X6, X8, X24): # bitfield if self._parsebf: # if we're parsing bitfields (offset, index) = self._set_attribute_bitfield( adef, offset, index, **kwargs @@ -201,8 +215,8 @@ def _set_attribute_group( # if CFG-VALGET or CFG-VALSET message, use dedicated method to # parse as configuration key value pairs if self._ubxClass == b"\x06" and ( - (self._ubxID == b"\x8b" and self._mode == ubt.GET) - or (self._ubxID == b"\x8a" and self._mode == ubt.SET) + (self._ubxID == b"\x8b" and self._mode == GET) + or (self._ubxID == b"\x8a" and self._mode == SET) ): self._set_attribute_cfgval(offset, **kwargs) else: @@ -217,7 +231,7 @@ def _set_attribute_group( if ( self._ubxClass == b"\x10" and self._ubxID == b"\x02" - and self._mode == ubt.SET + and self._mode == SET ): if getattr(self, "calibTtagValid", 0): gsiz += 1 @@ -256,7 +270,7 @@ def _set_attribute_single( # if attribute is scaled ares = 1 - if isinstance(adef, list) and self._scaling: + if isinstance(adef, list): ares = adef[1] # attribute resolution (i.e. scaling factor) adef = adef[0] # attribute definition @@ -267,7 +281,7 @@ def _set_attribute_single( anami += f"_{i:02d}" # determine attribute size (bytes) - if adef == ubt.CH: # variable length string + if adef == CH: # variable length string asiz = len(self._payload) else: asiz = attsiz(adef) @@ -279,7 +293,7 @@ def _set_attribute_single( if ares == 1: val = bytes2val(valb, adef) else: - val = round(bytes2val(valb, adef) * ares, ubt.SCALROUND) + val = round(bytes2val(valb, adef) * ares, SCALROUND) else: # if individual keyword has been provided, # set to provided value, else set to @@ -293,14 +307,11 @@ def _set_attribute_single( if anami[0:3] == "_HP": # high precision component of earlier attribute # add standard and high precision values in a single attribute - setattr( - self, anami[3:], round(getattr(self, anami[3:]) + val, ubt.SCALROUND) - ) + setattr(self, anami[3:], round(getattr(self, anami[3:]) + val, SCALROUND)) else: setattr(self, anami, val) - offset += asiz - return offset + return offset + asiz def _set_attribute_bitfield( self, atyp: str, offset: int, index: list, **kwargs @@ -336,11 +347,10 @@ def _set_attribute_bitfield( ) # update payload - offset += bsiz if "payload" not in kwargs: self._payload += bitfield.to_bytes(bsiz, "little") - return (offset, index) + return (offset + bsiz, index) def _set_attribute_bits( self, @@ -375,16 +385,14 @@ def _set_attribute_bits( atts = attsiz(keyt) # determine flag size in bits if "payload" in kwargs: - mask = pow(2, atts) - 1 - val = (bitfield >> bfoffset) & mask + val = (bitfield >> bfoffset) & ((1 << atts) - 1) else: val = kwargs.get(keyr, 0) bitfield = bitfield | (val << bfoffset) if key[0:8] != "reserved": # don't bother to set reserved bits setattr(self, keyr, val) - bfoffset += atts - return (bitfield, bfoffset) + return (bitfield, bfoffset + atts) def _set_attribute_cfgval(self, offset: int, **kwargs): """ @@ -401,7 +409,7 @@ def _set_attribute_cfgval(self, offset: int, **kwargs): if "payload" in kwargs: self._payload = kwargs["payload"] else: - raise ube.UBXMessageError( + raise UBXMessageError( "CFG-VALGET message definitions must include payload keyword" ) cfglen = len(self._payload[offset:]) @@ -427,14 +435,11 @@ def _do_len_checksum(self): """ Calculate and format payload length and checksum as bytes.""" - if self._payload is None: - self._length = val2bytes(0, ubt.U2) - self._checksum = calc_checksum(self._ubxClass + self._ubxID + self._length) - else: - self._length = val2bytes(len(self._payload), ubt.U2) - self._checksum = calc_checksum( - self._ubxClass + self._ubxID + self._length + self._payload - ) + payload = b"" if self._payload is None else self._payload + self._length = val2bytes(len(payload), U2) + self._checksum = calc_checksum( + self._ubxClass + self._ubxID + self._length + payload + ) def _get_dict(self, **kwargs) -> dict: """ @@ -450,24 +455,24 @@ def _get_dict(self, **kwargs) -> dict: try: msg = self._ubxClass + self._ubxID - variant = umv.VARIANTS[self._mode].get(msg, False) + variant = VARIANTS[self._mode].get(msg, False) if variant and msg[0] == 0x13: # MGA pdict = variant(msg, self._mode, **kwargs) elif variant: pdict = variant(**kwargs) - elif self._mode == ubt.POLL: - pdict = ubp.UBX_PAYLOADS_POLL[self.identity] - elif self._mode == ubt.SET: - pdict = ubs.UBX_PAYLOADS_SET[self.identity] + elif self._mode == POLL: + pdict = UBX_PAYLOADS_POLL[self.identity] + elif self._mode == SET: + pdict = UBX_PAYLOADS_SET[self.identity] else: # Unknown GET message, parsed to nominal definition if self.identity[-7:] == "NOMINAL": pdict = {} else: - pdict = ubg.UBX_PAYLOADS_GET[self.identity] + pdict = UBX_PAYLOADS_GET[self.identity] return pdict except KeyError as err: - raise ube.UBXMessageError( + raise UBXMessageError( f"Unknown message type {escapeall(self._ubxClass + self._ubxID)}" f", mode {["GET", "SET", "POLL"][self._mode]}. " "Check 'msgmode' setting is appropriate for data stream" @@ -539,11 +544,11 @@ def __str__(self) -> str: self._ubxClass == b"\x06" and self._ubxID == b"\x01" ): if att in ["clsID", "msgClass"]: - clsid = val2bytes(val, ubt.U1) - val = ubt.UBX_CLASSES.get(clsid, clsid) + clsid = val2bytes(val, U1) + val = UBX_CLASSES.get(clsid, clsid) if att == "msgID" and clsid: - msgid = val2bytes(val, ubt.U1) - val = ubt.UBX_MSGIDS.get(clsid + msgid, clsid + msgid) + msgid = val2bytes(val, U1) + val = UBX_MSGIDS.get(clsid + msgid, clsid + msgid) stg += att + "=" + str(val) if i < len(self.__dict__) - 1: stg += ", " @@ -577,7 +582,7 @@ def __setattr__(self, name, value): """ if self._immutable: - raise ube.UBXMessageError( + raise UBXMessageError( f"Object is immutable. Updates to {name} not permitted after initialisation." ) @@ -592,11 +597,14 @@ def serialize(self) -> bytes: """ - output = ubt.UBX_HDR + self._ubxClass + self._ubxID + self._length - output += ( - self._checksum if self._payload is None else self._payload + self._checksum + return ( + UBX_HDR + + self._ubxClass + + self._ubxID + + self._length + + (b"" if self._payload is None else self._payload) + + self._checksum ) - return output @property def identity(self) -> str: @@ -616,15 +624,15 @@ def identity(self) -> str: # all MGA messages except MGA-DBD need to be identified by the # 'type' attribute - the first byte of the payload if self._ubxClass == b"\x13" and self._ubxID != b"\x80": - umsg_name = ubt.UBX_MSGIDS[ + umsg_name = UBX_MSGIDS[ self._ubxClass + self._ubxID + self._payload[0:1] ] else: - umsg_name = ubt.UBX_MSGIDS[self._ubxClass + self._ubxID] + umsg_name = UBX_MSGIDS[self._ubxClass + self._ubxID] except KeyError: # unrecognised u-blox message, parsed to UBX-NOMINAL definition - if self._ubxClass in ubt.UBX_CLASSES: # known class - cls = ubt.UBX_CLASSES[self._ubxClass] + if self._ubxClass in UBX_CLASSES: # known class + cls = UBX_CLASSES[self._ubxClass] else: # unknown class cls = "UNKNOWN" umsg_name = ( @@ -666,7 +674,7 @@ def length(self) -> int: """ - return bytes2val(self._length, ubt.U2) + return bytes2val(self._length, U2) @property def payload(self) -> bytes: @@ -712,13 +720,13 @@ def config_set(layers: int, transaction: int, cfgData: list) -> object: num = len(cfgData) if num > 64: - raise ube.UBXMessageError( + raise UBXMessageError( f"Number of configuration tuples {num} exceeds maximum of 64" ) - version = val2bytes(0 if transaction == 0 else 1, ubt.U1) - layers = val2bytes(layers, ubt.U1) - transaction = val2bytes(transaction, ubt.U1) + version = val2bytes(0 if transaction == 0 else 1, U1) + layers = val2bytes(layers, U1) + transaction = val2bytes(transaction, U1) payload = version + layers + transaction + b"\x00" lis = b"" @@ -729,11 +737,11 @@ def config_set(layers: int, transaction: int, cfgData: list) -> object: (key, att) = cfgname2key(key) # lookup keyID & attribute type else: (_, att) = cfgkey2name(key) # lookup attribute type - keyb = val2bytes(key, ubt.U4) + keyb = val2bytes(key, U4) valb = val2bytes(val, att) lis = lis + keyb + valb - return UBXMessage("CFG", "CFG-VALSET", ubt.SET, payload=payload + lis) + return UBXMessage("CFG", "CFG-VALSET", SET, payload=payload + lis) @staticmethod def config_del(layers: int, transaction: int, keys: list) -> object: @@ -755,23 +763,23 @@ def config_del(layers: int, transaction: int, keys: list) -> object: num = len(keys) if num > 64: - raise ube.UBXMessageError( + raise UBXMessageError( f"Number of configuration keys {num} exceeds maximum of 64" ) - version = val2bytes(0 if transaction == 0 else 1, ubt.U1) - layers = val2bytes(layers, ubt.U1) - transaction = val2bytes(transaction, ubt.U1) + version = val2bytes(0 if transaction == 0 else 1, U1) + layers = val2bytes(layers, U1) + transaction = val2bytes(transaction, U1) payload = version + layers + transaction + b"\x00" lis = b"" for key in keys: if isinstance(key, str): # if keyname as a string (key, _) = cfgname2key(key) # lookup keyID - keyb = val2bytes(key, ubt.U4) + keyb = val2bytes(key, U4) lis = lis + keyb - return UBXMessage("CFG", "CFG-VALDEL", ubt.SET, payload=payload + lis) + return UBXMessage("CFG", "CFG-VALDEL", SET, payload=payload + lis) @staticmethod def config_poll(layer: int, position: int, keys: list) -> object: @@ -793,20 +801,20 @@ def config_poll(layer: int, position: int, keys: list) -> object: num = len(keys) if num > 64: - raise ube.UBXMessageError( + raise UBXMessageError( f"Number of configuration keys {num} exceeds maximum of 64" ) - version = val2bytes(0, ubt.U1) - layer = val2bytes(layer, ubt.U1) - position = val2bytes(position, ubt.U2) + version = val2bytes(0, U1) + layer = val2bytes(layer, U1) + position = val2bytes(position, U2) payload = version + layer + position lis = b"" for key in keys: if isinstance(key, str): # if keyname as a string (key, _) = cfgname2key(key) # lookup keyID - keyb = val2bytes(key, ubt.U4) + keyb = val2bytes(key, U4) lis = lis + keyb - return UBXMessage("CFG", "CFG-VALGET", ubt.POLL, payload=payload + lis) + return UBXMessage("CFG", "CFG-VALGET", POLL, payload=payload + lis) diff --git a/src/pyubx2/ubxreader.py b/src/pyubx2/ubxreader.py index 07dd042..05be2c1 100644 --- a/src/pyubx2/ubxreader.py +++ b/src/pyubx2/ubxreader.py @@ -65,7 +65,6 @@ def __init__( protfilter: int = NMEA_PROTOCOL | UBX_PROTOCOL | RTCM3_PROTOCOL, quitonerror: int = ERR_LOG, parsebitfield: bool = True, - scaling: bool = True, labelmsm: int = 1, bufsize: int = 4096, parsing: bool = True, @@ -82,7 +81,6 @@ def __init__( :param int quitonerror: ERR_IGNORE (0) = ignore errors, ERR_LOG (1) = log continue, ERR_RAISE (2) = (re)raise (1) :param bool parsebitfield: 1 = parse bitfields, 0 = leave as bytes (1) - :param bool scaling: 1 = apply scale factors, 0 = do not apply (1) :param int labelmsm: RTCM3 MSM label type 1 = RINEX, 2 = BAND (1) :param int bufsize: socket recv buffer size (4096) :param bool parsing: True = parse data, False = don't parse data (output raw only) (True) @@ -100,7 +98,6 @@ def __init__( self._errorhandler = errorhandler self._validate = validate self._parsebf = parsebitfield - self._scaling = scaling self._labelmsm = labelmsm self._msgmode = msgmode self._parsing = parsing @@ -236,7 +233,6 @@ def _parse_ubx(self, hdr: bytes) -> tuple: validate=self._validate, msgmode=self._msgmode, parsebitfield=self._parsebf, - scaling=self._scaling, ) else: parsed_data = None @@ -286,7 +282,6 @@ def _parse_rtcm3(self, hdr: bytes) -> tuple: parsed_data = RTCMReader.parse( raw_data, validate=self._validate, - scaling=self._scaling, labelmsm=self._labelmsm, ) else: @@ -367,7 +362,6 @@ def parse( msgmode: int = GET, validate: int = VALCKSUM, parsebitfield: bool = True, - scaling: bool = True, ) -> object: """ Parse UBX byte stream to UBXMessage object. @@ -377,7 +371,6 @@ def parse( :param int validate: VALCKSUM (1) = Validate checksum, VALNONE (0) = ignore invalid checksum (1) :param bool parsebitfield: 1 = parse bitfields, 0 = leave as bytes (1) - :param bool scaling: 1 = apply scale factors, 0 = do not apply (1) :return: UBXMessage object :rtype: UBXMessage :raises: Exception (if data stream contains invalid data or unknown message type) @@ -432,5 +425,4 @@ def parse( msgmode, payload=payload, parsebitfield=parsebitfield, - scaling=scaling, )