Skip to content

Commit

Permalink
LP fixed for case, where report contains nulls instead of valid int/f…
Browse files Browse the repository at this point in the history
…loat/date/time data
  • Loading branch information
smarek committed Jan 19, 2023
1 parent aeed6e9 commit d9226ae
Show file tree
Hide file tree
Showing 3 changed files with 65 additions and 22 deletions.
72 changes: 52 additions & 20 deletions okdmr/dmrlib/hytera/pdu/location_protocol.py
Original file line number Diff line number Diff line change
Expand Up @@ -75,22 +75,30 @@ def __init__(
direction: Union[bytes, int],
):
self.data_valid: Literal["A", "V"] = data_valid
self.greenwich_time: time = (
self.greenwich_time: Optional[time] = (
greenwich_time
if isinstance(greenwich_time, time)
else time(
hour=int(greenwich_time[0:2]),
minute=int(greenwich_time[2:4]),
second=int(greenwich_time[4:6]),
else (
time(
hour=int(greenwich_time[0:2]),
minute=int(greenwich_time[2:4]),
second=int(greenwich_time[4:6]),
)
if len(greenwich_time.replace(b"\x00", b""))
else None
)
)
self.greenwich_date: date = (
self.greenwich_date: Optional[date] = (
greenwich_date
if isinstance(greenwich_date, date)
else date(
day=int(greenwich_date[0:2]),
month=int(greenwich_date[2:4]),
year=2000 + int(greenwich_date[4:6]),
else (
date(
day=int(greenwich_date[0:2]),
month=int(greenwich_date[2:4]),
year=2000 + int(greenwich_date[4:6]),
)
if len(greenwich_date.replace(b"\x00", b""))
else None
)
)
self.north_south: Literal["N", "S"] = north_south
Expand All @@ -106,9 +114,17 @@ def __init__(
self.speed_knots: float = (
speed_knots
if isinstance(speed_knots, float)
else float(speed_knots.decode("ascii"))
else (
float(speed_knots.decode("ascii"))
if len(speed_knots.replace(b"\x00", b""))
else 0
)
)
self.direction: int = (
direction
if isinstance(direction, int)
else (int(direction) if len(direction.replace(b"\x00", b"")) else 0)
)
self.direction: int = int(direction)

@staticmethod
def from_bytes(
Expand All @@ -132,14 +148,22 @@ def from_bytes(
def as_bytes(self, endian: Literal["big", "little"] = "big") -> bytes:
return (
self.data_valid
+ self.greenwich_time.strftime("%H%M%S")
+ self.greenwich_date.strftime("%d%m%y")
+ (
"\0" * 6
if self.greenwich_time is None
else self.greenwich_time.strftime("%H%M%S")
)
+ (
"\0" * 6
if self.greenwich_date is None
else self.greenwich_date.strftime("%d%m%y")
)
+ self.north_south
+ f"{self.latitude:09.4f}"
+ self.east_west
+ f"{self.longitude:010.4f}"
+ f"{self.speed_knots:03}"
+ f"{self.direction:03}"
+ ("\0" * 3 if self.speed_knots <= 0 else f"{self.speed_knots:03}")
+ ("\0" * 3 if not self.direction else f"{self.direction:03}")
).encode("ascii")

def __repr__(self) -> str:
Expand All @@ -148,12 +172,20 @@ def __repr__(self) -> str:

return (
("VALID " if self.data_valid else "INVALID ")
+ self.greenwich_date.strftime("%H:%M:%S ")
+ self.greenwich_time.strftime("%d.%m.%Y ")
+ (
""
if not self.greenwich_date
else self.greenwich_date.strftime("%H:%M:%S ")
)
+ (
""
if not self.greenwich_time
else self.greenwich_time.strftime("%d.%m.%Y ")
)
+ self.north_south
+ f"{round(float(_lat[:2]) + float(_lat[2:])/60.0, 8)} "
+ f"{round(float(_lat[:2]) + float(_lat[2:]) / 60.0, 8)} "
+ self.east_west
+ f"{round(float(_lon[:3]) + float(_lon[3:])/60.0, 8)} "
+ f"{round(float(_lon[:3]) + float(_lon[3:]) / 60.0, 8)} "
+ f"speed:{round(self.speed_knots * 1.852, 5)} km/h "
+ f"direction:{self.direction}°"
)
Expand Down
9 changes: 8 additions & 1 deletion okdmr/tests/dmrlib/hytera/app/test_rrs_app.py
Original file line number Diff line number Diff line change
Expand Up @@ -112,7 +112,14 @@ def datagram_received(
@param addr:
@return: Tuple[(pdu was handled, by HSTRPLayer itself), (optionally parsed HSTRP object)]
"""
pdu = HSTRP.from_bytes(data=data)
try:
pdu = HSTRP.from_bytes(data=data)
except:
pdu = None
self.log_error(
f"Could not decode HSTRP from received UDP datagram {data.hex()}"
)

was_handled: bool = False
was_confirmed: bool = False

Expand Down
6 changes: 5 additions & 1 deletion okdmr/tests/dmrlib/hytera/pdu/test_lp.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,11 +5,15 @@

def test_location_protocol():
msgs: Dict[str, Dict[str, any]] = {
"08a0020032000000010a2110dd0000413138333634383236313031354e343731382e383035314530313835342e34333837302e313132310b03": {}
"08a0020032000000010a2110dd0000413138333634383236313031354e343731382e383035314530313835342e34333837302e313132310b03": {},
"08a002003200000003002337fb0000410000000000000000000000004e353030332e383737314530313432362e353330320000000000007003": {},
}
for msg in msgs:
msg_bytes = bytes.fromhex(msg)
lp = LocationProtocol.from_bytes(msg_bytes)
if msg_bytes != lp.as_bytes():
print(msg.lower())
print(lp.as_bytes().hex().lower())
assert lp.as_bytes() == msg_bytes
assert len(repr(lp)) > 0
print(repr(lp))

0 comments on commit d9226ae

Please sign in to comment.