Skip to content

Commit

Permalink
streamline variant processing
Browse files Browse the repository at this point in the history
  • Loading branch information
semuadmin committed May 20, 2024
1 parent a04d4ec commit c04ea3f
Show file tree
Hide file tree
Showing 7 changed files with 350 additions and 349 deletions.
4 changes: 1 addition & 3 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -449,9 +449,7 @@ The UBX protocol is principally defined in the modules `ubxtypes_*.py` as a seri

Repeating attribute names are parsed with a two-digit suffix (svid_01, svid_02, etc.). Nested repeating groups are supported. See CFG-VALGET, MON-SPAN, NAV-PVT, NAV-SAT and RXM-RLM by way of examples.

In most cases, a UBX message's content (payload) is uniquely defined by its class, id and mode; accommodating the message simply requires the addition of an appropriate dictionary entry to the relevant `ubxtypes_*.py` module(s).

However, there are a handful of message types which have multiple possible payload definitions for the same class, id and mode. These exceptional message types require dedicated routines in `ubxmessage.py` which examine elements of the payload itself in order to determine the appropriate dictionary definition. This currently applies to the following message types: CFG-NMEA, NAV-RELPOSNED, RXM-PMP, RXM-PMREQ, RXM-RLM, TIM-VCOCAL.
In most cases, a UBX message's content (payload) is uniquely defined by its class, id and mode; accommodating the message simply requires the addition of an appropriate dictionary entry to the relevant `ubxtypes_*.py` module(s). However, there are a handful of message types which have multiple possible payload variants for the same class, id and mode. These exceptional message types are handled in `ubxvariants.py`, to which any additional variants should be added.

---
## <a name="troubleshoot">Troubleshooting</a>
Expand Down
8 changes: 8 additions & 0 deletions docs/pyubx2.rst
Original file line number Diff line number Diff line change
Expand Up @@ -92,6 +92,14 @@ pyubx2.ubxtypes\_set module
:undoc-members:
:show-inheritance:

pyubx2.ubxvariants module
-------------------------

.. automodule:: pyubx2.ubxvariants
:members:
:undoc-members:
:show-inheritance:

Module contents
---------------

Expand Down
4 changes: 2 additions & 2 deletions src/pyubx2/socket_stream.py
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ def _recv(self) -> bool:
try:
data = self._socket.recv(self._bufsize)
if len(data) == 0:
return False
return False # pragma: no cover
self._buffer += data
except (OSError, TimeoutError):
return False
Expand Down Expand Up @@ -100,7 +100,7 @@ def readline(self) -> bytes:
if line[-1:] == b"\n": # LF
break
else:
break
break # pragma: no cover

return line

Expand Down
316 changes: 18 additions & 298 deletions src/pyubx2/ubxmessage.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,6 @@
"""
ubxmessage.py
Main UBX Message Protocol Class.
Created on 26 Sep 2020
Expand All @@ -17,6 +19,7 @@
import pyubx2.ubxtypes_get as ubg
import pyubx2.ubxtypes_poll as ubp
import pyubx2.ubxtypes_set as ubs
import pyubx2.ubxvariants as umv
from pyubx2.ubxhelpers import (
attsiz,
bytes2val,
Expand Down Expand Up @@ -437,7 +440,7 @@ def _get_dict(self, **kwargs) -> dict:
"""
Get payload dictionary corresponding to message mode (GET/SET/POLL)
Certain message types need special handling as alternate payload
definitions exist for the same ubxClass/ubxID.
variants exist for the same ubxClass/ubxID/mode.
:param kwargs: optional payload key/value pairs
:return: dictionary representing payload definition
Expand All @@ -446,312 +449,29 @@ def _get_dict(self, **kwargs) -> dict:
"""

try:
if self._mode == ubt.POLL:
# CFG-TP5 POLL
if self._ubxClass == b"\x06" and self._ubxID == b"\x31":
pdict = self._get_cfgtp5_version(**kwargs)
else:
pdict = ubp.UBX_PAYLOADS_POLL[self.identity]
msg = self._ubxClass + self._ubxID
variant = umv.VARIANTS[self._mode].get(msg, False)
if variant and msg[0] == 0x13: # MGA
pdict = variant(msg, self._mode, **kwargs)
elif variant:
pdict = variant(**kwargs)
elif self._mode == ubt.POLL:
pdict = ubp.UBX_PAYLOADS_POLL[self.identity]
elif self._mode == ubt.SET:
# MGA SET
if self._ubxClass == b"\x13" and self._ubxID != b"\x80":
pdict = self._get_mga_version(ubt.SET, **kwargs)
# RXM-PMP SET
elif self._ubxClass == b"\x02" and self._ubxID == b"\x72":
pdict = self._get_rxmpmp_version(**kwargs)
# RXM-PMREQ SET
elif self._ubxClass == b"\x02" and self._ubxID == b"\x41":
pdict = self._get_rxmpmreq_version(**kwargs)
# TIM-VCOCAL SET
elif self._ubxClass == b"\x0d" and self._ubxID == b"\x15":
pdict = self._get_timvcocal_version(**kwargs)
# CFG-DAT SET
elif self._ubxClass == b"\x06" and self._ubxID == b"\x06":
pdict = self._get_cfgdat_version(**kwargs)
else:
pdict = ubs.UBX_PAYLOADS_SET[self.identity]
pdict = ubs.UBX_PAYLOADS_SET[self.identity]
else:
# MGA GET
if self._ubxClass == b"\x13" and self._ubxID != b"\x80":
pdict = self._get_mga_version(ubt.GET, **kwargs)
# RXM-PMP GET
elif self._ubxClass == b"\x02" and self._ubxID == b"\x72":
pdict = self._get_rxmpmp_version(**kwargs)
# RXM-RLM GET
elif self._ubxClass == b"\x02" and self._ubxID == b"\x59":
pdict = self._get_rxmrlm_version(**kwargs)
# CFG-NMEA GET
elif self._ubxClass == b"\x06" and self._ubxID == b"\x17":
pdict = self._get_cfgnmea_version(**kwargs)
# NAV-AOPSTATUS GET
elif self._ubxClass == b"\x01" and self._ubxID == b"\x60":
pdict = self._get_aopstatus_version(**kwargs)
# NAV-RELPOSNED GET
elif self._ubxClass == b"\x01" and self._ubxID == b"\x3C":
pdict = self._get_relposned_version(**kwargs)
# Unknown GET message, parsed to nominal definition
elif self.identity[-7:] == "NOMINAL":
if self.identity[-7:] == "NOMINAL":
pdict = {}
else:
pdict = ubg.UBX_PAYLOADS_GET[self.identity]
return pdict
except KeyError as err:
raise KeyError(
f"{err} - Check 'msgmode' keyword argument is appropriate for data stream"
) from err

def _get_cfgtp5_version(self, **kwargs) -> dict:
"""
Select appropriate CFG-TP5 POLL payload definition by checking
presence of tpIdx or payload argument.
:param kwargs: optional payload key/value pairs
:return: dictionary representing payload definition
:rtype: dict
"""

lp = 0
if "payload" in kwargs:
lp = len(kwargs["payload"])
elif "tpIdx" in kwargs:
lp = 1
if lp == 1:
pdict = ubp.UBX_PAYLOADS_POLL["CFG-TP5-TPX"]
else:
pdict = ubp.UBX_PAYLOADS_POLL["CFG-TP5"]
return pdict

def _get_mga_version(self, mode: int, **kwargs) -> dict:
"""
Select appropriate MGA payload definition by checking
value of 'type' attribute (1st byte of payload).
:param str mode: mode (0=GET, 1=SET, 2=POLL)
:param kwargs: optional payload key/value pairs
:return: dictionary representing payload definition
:rtype: dict
:raises: UBXMessageError
"""

if "type" in kwargs:
typ = val2bytes(kwargs["type"], ubt.U1)
elif "payload" in kwargs:
typ = kwargs["payload"][0:1]
else:
raise ube.UBXMessageError(
"MGA message definitions must include type or payload keyword"
)
identity = ubt.UBX_MSGIDS[self._ubxClass + self._ubxID + typ]
if mode == ubt.SET:
pdict = ubs.UBX_PAYLOADS_SET[identity]
else:
pdict = ubg.UBX_PAYLOADS_GET[identity]
return pdict

def _get_rxmpmreq_version(self, **kwargs) -> dict:
"""
Select appropriate RXM-PMREQ payload definition by checking
the 'version' keyword or payload length.
:param kwargs: optional payload key/value pairs
:return: dictionary representing payload definition
:rtype: dict
:raises: UBXMessageError
"""

lpd = 0
if "version" in kwargs: # assume longer version
lpd = 16
elif "payload" in kwargs:
lpd = len(kwargs["payload"])
else:
raise ube.UBXMessageError(
"RXM-PMREQ message definitions must include version or payload keyword"
)
if lpd == 16:
pdict = ubs.UBX_PAYLOADS_SET["RXM-PMREQ"] # long
else:
pdict = ubs.UBX_PAYLOADS_SET["RXM-PMREQ-S"] # short
return pdict

def _get_rxmpmp_version(self, **kwargs) -> dict:
"""
Select appropriate RXM-PMP payload definition by checking
value of 'version' attribute (1st byte of payload).
:param kwargs: optional payload key/value pairs
:return: dictionary representing payload definition
:rtype: dict
:raises: UBXMessageError
"""

if "version" in kwargs:
ver = val2bytes(kwargs["version"], ubt.U1)
elif "payload" in kwargs:
ver = kwargs["payload"][0:1]
else:
raise ube.UBXMessageError(
"RXM-PMP message definitions must include version or payload keyword"
)
if ver == b"\x00":
pdict = ubs.UBX_PAYLOADS_SET["RXM-PMP-V0"]
else:
pdict = ubs.UBX_PAYLOADS_SET["RXM-PMP-V1"]
return pdict

def _get_rxmrlm_version(self, **kwargs) -> dict:
"""
Select appropriate RXM-RLM payload definition by checking
value of 'type' attribute (2nd byte of payload).
:param kwargs: optional payload key/value pairs
:return: dictionary representing payload definition
:rtype: dict
:raises: UBXMessageError
"""

if "type" in kwargs:
typ = val2bytes(kwargs["type"], ubt.U1)
elif "payload" in kwargs:
typ = kwargs["payload"][1:2]
else:
raise ube.UBXMessageError(
"RXM-RLM message definitions must include type or payload keyword"
)
if typ == b"\x01":
pdict = ubg.UBX_PAYLOADS_GET["RXM-RLM-S"] # short
else:
pdict = ubg.UBX_PAYLOADS_GET["RXM-RLM-L"] # long
return pdict

def _get_cfgnmea_version(self, **kwargs) -> dict:
"""
Select appropriate payload definition version for older
generations of CFG-NMEA message by checking payload length.
:param kwargs: optional payload key/value pairs
:return: dictionary representing payload definition
:rtype: dict
:raises: UBXMessageError
"""

if "payload" in kwargs:
lpd = len(kwargs["payload"])
else:
raise ube.UBXMessageError(
"CFG-NMEA message definitions must include payload keyword"
)
if lpd == 4:
pdict = ubg.UBX_PAYLOADS_GET["CFG-NMEAvX"]
elif lpd == 12:
pdict = ubg.UBX_PAYLOADS_GET["CFG-NMEAv0"]
else:
pdict = ubg.UBX_PAYLOADS_GET["CFG-NMEA"]
return pdict

def _get_aopstatus_version(self, **kwargs) -> dict:
"""
Select appropriate payload definition version for older
generations of NAV-AOPSTATUS message by checking payload length.
:param kwargs: optional payload key/value pairs
:return: dictionary representing payload definition
:rtype: dict
:raises: UBXMessageError
"""

if "payload" in kwargs:
lpd = len(kwargs["payload"])
else:
raise ube.UBXMessageError(
"NAV-AOPSTATUS message definitions must include payload keyword"
)
if lpd == 20:
pdict = ubg.UBX_PAYLOADS_GET["NAV-AOPSTATUS-L"]
else:
pdict = ubg.UBX_PAYLOADS_GET["NAV-AOPSTATUS"]
return pdict

def _get_relposned_version(self, **kwargs) -> dict:
"""
Select appropriate NAV-RELPOSNED payload definition by checking
value of 'version' attribute (1st byte of payload).
:param kwargs: optional payload key/value pairs
:return: dictionary representing payload definition
:rtype: dict
:raises: UBXMessageError
"""

if "version" in kwargs:
ver = val2bytes(kwargs["version"], ubt.U1)
elif "payload" in kwargs:
ver = kwargs["payload"][0:1]
else:
raise ube.UBXMessageError(
"NAV-RELPOSNED message definitions must include version or payload keyword"
)
if ver == b"\x00":
pdict = ubg.UBX_PAYLOADS_GET["NAV-RELPOSNED-V0"]
else:
pdict = ubg.UBX_PAYLOADS_GET["NAV-RELPOSNED"]
return pdict

def _get_timvcocal_version(self, **kwargs) -> dict:
"""
Select appropriate TIM-VCOCAL SET payload definition by checking
the payload length.
:param kwargs: optional payload key/value pairs
:return: dictionary representing payload definition
:rtype: dict
:raises: UBXMessageError
"""

lpd = 1
typ = 0
if "type" in kwargs:
typ = kwargs["type"]
elif "payload" in kwargs:
lpd = len(kwargs["payload"])
else:
raise ube.UBXMessageError(
"TIM-VCOCAL SET message definitions must include type or payload keyword"
)
if lpd == 1 and typ == 0:
pdict = ubs.UBX_PAYLOADS_SET["TIM-VCOCAL-V0"] # stop cal
else:
pdict = ubs.UBX_PAYLOADS_SET["TIM-VCOCAL"] # cal
return pdict

def _get_cfgdat_version(self, **kwargs) -> dict:
"""
Select appropriate CFG-DAT SET payload definition by checking
presence of datumNum keyword or payload length of 2 bytes.
:param kwargs: optional payload key/value pairs
:return: dictionary representing payload definition
:rtype: dict
"""

lpd = 0
if "payload" in kwargs:
lpd = len(kwargs["payload"])
if lpd == 2 or "datumNum" in kwargs:
pdict = ubs.UBX_PAYLOADS_SET["CFG-DAT-NUM"] # datum num set
else:
pdict = ubs.UBX_PAYLOADS_SET["CFG-DAT"] # manual datum set
return pdict
f"Unknown message type {escapeall(self._ubxClass + self._ubxID)}"
f", mode {["GET", "SET", "POLL"][self._mode]}. "
"Check 'msgmode' setting is appropriate for data stream"
) from err

def _calc_num_repeats(
self, attd: dict, payload: bytes, offset: int, offsetend: int = 0
Expand Down
Loading

0 comments on commit c04ea3f

Please sign in to comment.