From d9226aea862db8d5287ca58627499fcf40a1bc4d Mon Sep 17 00:00:00 2001 From: Marek Sebera Date: Thu, 19 Jan 2023 14:26:16 +0100 Subject: [PATCH] LP fixed for case, where report contains nulls instead of valid int/float/date/time data --- okdmr/dmrlib/hytera/pdu/location_protocol.py | 72 +++++++++++++------ okdmr/tests/dmrlib/hytera/app/test_rrs_app.py | 9 ++- okdmr/tests/dmrlib/hytera/pdu/test_lp.py | 6 +- 3 files changed, 65 insertions(+), 22 deletions(-) diff --git a/okdmr/dmrlib/hytera/pdu/location_protocol.py b/okdmr/dmrlib/hytera/pdu/location_protocol.py index 6d154f8..dddfa3c 100644 --- a/okdmr/dmrlib/hytera/pdu/location_protocol.py +++ b/okdmr/dmrlib/hytera/pdu/location_protocol.py @@ -75,22 +75,30 @@ def __init__( direction: Union[bytes, int], ): self.data_valid: Literal["A", "V"] = data_valid - self.greenwich_time: time = ( + self.greenwich_time: Optional[time] = ( greenwich_time if isinstance(greenwich_time, time) - else time( - hour=int(greenwich_time[0:2]), - minute=int(greenwich_time[2:4]), - second=int(greenwich_time[4:6]), + else ( + time( + hour=int(greenwich_time[0:2]), + minute=int(greenwich_time[2:4]), + second=int(greenwich_time[4:6]), + ) + if len(greenwich_time.replace(b"\x00", b"")) + else None ) ) - self.greenwich_date: date = ( + self.greenwich_date: Optional[date] = ( greenwich_date if isinstance(greenwich_date, date) - else date( - day=int(greenwich_date[0:2]), - month=int(greenwich_date[2:4]), - year=2000 + int(greenwich_date[4:6]), + else ( + date( + day=int(greenwich_date[0:2]), + month=int(greenwich_date[2:4]), + year=2000 + int(greenwich_date[4:6]), + ) + if len(greenwich_date.replace(b"\x00", b"")) + else None ) ) self.north_south: Literal["N", "S"] = north_south @@ -106,9 +114,17 @@ def __init__( self.speed_knots: float = ( speed_knots if isinstance(speed_knots, float) - else float(speed_knots.decode("ascii")) + else ( + float(speed_knots.decode("ascii")) + if len(speed_knots.replace(b"\x00", b"")) + else 0 + ) + ) + self.direction: int = ( + direction + if isinstance(direction, int) + else (int(direction) if len(direction.replace(b"\x00", b"")) else 0) ) - self.direction: int = int(direction) @staticmethod def from_bytes( @@ -132,14 +148,22 @@ def from_bytes( def as_bytes(self, endian: Literal["big", "little"] = "big") -> bytes: return ( self.data_valid - + self.greenwich_time.strftime("%H%M%S") - + self.greenwich_date.strftime("%d%m%y") + + ( + "\0" * 6 + if self.greenwich_time is None + else self.greenwich_time.strftime("%H%M%S") + ) + + ( + "\0" * 6 + if self.greenwich_date is None + else self.greenwich_date.strftime("%d%m%y") + ) + self.north_south + f"{self.latitude:09.4f}" + self.east_west + f"{self.longitude:010.4f}" - + f"{self.speed_knots:03}" - + f"{self.direction:03}" + + ("\0" * 3 if self.speed_knots <= 0 else f"{self.speed_knots:03}") + + ("\0" * 3 if not self.direction else f"{self.direction:03}") ).encode("ascii") def __repr__(self) -> str: @@ -148,12 +172,20 @@ def __repr__(self) -> str: return ( ("VALID " if self.data_valid else "INVALID ") - + self.greenwich_date.strftime("%H:%M:%S ") - + self.greenwich_time.strftime("%d.%m.%Y ") + + ( + "" + if not self.greenwich_date + else self.greenwich_date.strftime("%H:%M:%S ") + ) + + ( + "" + if not self.greenwich_time + else self.greenwich_time.strftime("%d.%m.%Y ") + ) + self.north_south - + f"{round(float(_lat[:2]) + float(_lat[2:])/60.0, 8)} " + + f"{round(float(_lat[:2]) + float(_lat[2:]) / 60.0, 8)} " + self.east_west - + f"{round(float(_lon[:3]) + float(_lon[3:])/60.0, 8)} " + + f"{round(float(_lon[:3]) + float(_lon[3:]) / 60.0, 8)} " + f"speed:{round(self.speed_knots * 1.852, 5)} km/h " + f"direction:{self.direction}°" ) diff --git a/okdmr/tests/dmrlib/hytera/app/test_rrs_app.py b/okdmr/tests/dmrlib/hytera/app/test_rrs_app.py index 8a81cb0..3807381 100644 --- a/okdmr/tests/dmrlib/hytera/app/test_rrs_app.py +++ b/okdmr/tests/dmrlib/hytera/app/test_rrs_app.py @@ -112,7 +112,14 @@ def datagram_received( @param addr: @return: Tuple[(pdu was handled, by HSTRPLayer itself), (optionally parsed HSTRP object)] """ - pdu = HSTRP.from_bytes(data=data) + try: + pdu = HSTRP.from_bytes(data=data) + except: + pdu = None + self.log_error( + f"Could not decode HSTRP from received UDP datagram {data.hex()}" + ) + was_handled: bool = False was_confirmed: bool = False diff --git a/okdmr/tests/dmrlib/hytera/pdu/test_lp.py b/okdmr/tests/dmrlib/hytera/pdu/test_lp.py index 763bc4b..d59cd9d 100644 --- a/okdmr/tests/dmrlib/hytera/pdu/test_lp.py +++ b/okdmr/tests/dmrlib/hytera/pdu/test_lp.py @@ -5,11 +5,15 @@ def test_location_protocol(): msgs: Dict[str, Dict[str, any]] = { - "08a0020032000000010a2110dd0000413138333634383236313031354e343731382e383035314530313835342e34333837302e313132310b03": {} + "08a0020032000000010a2110dd0000413138333634383236313031354e343731382e383035314530313835342e34333837302e313132310b03": {}, + "08a002003200000003002337fb0000410000000000000000000000004e353030332e383737314530313432362e353330320000000000007003": {}, } for msg in msgs: msg_bytes = bytes.fromhex(msg) lp = LocationProtocol.from_bytes(msg_bytes) + if msg_bytes != lp.as_bytes(): + print(msg.lower()) + print(lp.as_bytes().hex().lower()) assert lp.as_bytes() == msg_bytes assert len(repr(lp)) > 0 print(repr(lp))