From 5b87ca6197445a5c1b7f35dc6c4e7d62627df38c Mon Sep 17 00:00:00 2001 From: Kyle Coble <53625197+Hackerman342@users.noreply.github.com> Date: Wed, 21 Feb 2024 13:43:16 -0500 Subject: [PATCH] Add `PendantState` python class, proto, & example (#182) --- protos/farm_ng/canbus/amiga_v6.proto | 12 ++ py/examples/file_reader_can/main.py | 7 +- py/examples/pendant_state/README.md | 3 + py/examples/pendant_state/main.py | 52 +++++++ py/examples/pendant_state/requirements.txt | 1 + py/examples/pendant_state/service_config.json | 15 ++ py/examples/vehicle_twist/main.py | 6 +- py/farm_ng/canbus/packet.py | 146 ++++++++++++++++-- py/tests/test_packet.py | 115 ++++++++++++++ 9 files changed, 338 insertions(+), 19 deletions(-) create mode 100644 py/examples/pendant_state/README.md create mode 100644 py/examples/pendant_state/main.py create mode 100644 py/examples/pendant_state/requirements.txt create mode 100644 py/examples/pendant_state/service_config.json create mode 100644 py/tests/test_packet.py diff --git a/protos/farm_ng/canbus/amiga_v6.proto b/protos/farm_ng/canbus/amiga_v6.proto index 0abadd67..20291a83 100644 --- a/protos/farm_ng/canbus/amiga_v6.proto +++ b/protos/farm_ng/canbus/amiga_v6.proto @@ -76,3 +76,15 @@ message AmigaV6CanbusState { double battery_charge_level = 3; bool last_send_error = 4; // True if the last send on the canbus failed } + +message PendantState { + /* + State of the wired Pendant connected to the Amiga. + */ + uint32 node_id = 1; // Node ID of sender + double stamp = 2; // Received time, in host monotonic clock (seconds) + + double x = 3; // Joystick X axis + double y = 4; // Joystick Y axis + uint32 buttons = 5; // Buttons pressed, bit masked. See PendantButtons in packet.py +} diff --git a/py/examples/file_reader_can/main.py b/py/examples/file_reader_can/main.py index 714ef033..0f5127da 100644 --- a/py/examples/file_reader_can/main.py +++ b/py/examples/file_reader_can/main.py @@ -14,11 +14,10 @@ from __future__ import annotations import argparse -from typing import Optional from farm_ng.canbus import canbus_pb2 from farm_ng.canbus.packet import AmigaTpdo1 -from farm_ng.canbus.packet import parse_amiga_tpdo1_proto +from farm_ng.canbus.packet import DASHBOARD_NODE_ID from farm_ng.core.events_file_reader import build_events_dict from farm_ng.core.events_file_reader import EventLogPosition from farm_ng.core.events_file_reader import EventsFileReader @@ -47,8 +46,8 @@ def main(file_name: str) -> None: msg: canbus_pb2.RawCanbusMessage for msg in sample.messages: - tpdo1: Optional[AmigaTpdo1] = parse_amiga_tpdo1_proto(msg) - if tpdo1 is not None: + if msg.id == AmigaTpdo1.cob_id + DASHBOARD_NODE_ID: + tpdo1: AmigaTpdo1 = AmigaTpdo1.from_raw_canbus_message(msg) print(tpdo1) assert reader.close() diff --git a/py/examples/pendant_state/README.md b/py/examples/pendant_state/README.md new file mode 100644 index 00000000..dd176a7c --- /dev/null +++ b/py/examples/pendant_state/README.md @@ -0,0 +1,3 @@ +# Amiga Pendant State example + +URL: https://amiga.farm-ng.com/docs/examples/pendant_state diff --git a/py/examples/pendant_state/main.py b/py/examples/pendant_state/main.py new file mode 100644 index 00000000..d430c06d --- /dev/null +++ b/py/examples/pendant_state/main.py @@ -0,0 +1,52 @@ +# Copyright (c) farm-ng, inc. +# +# Licensed under the Amiga Development Kit License (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# https://github.com/farm-ng/amiga-dev-kit/blob/main/LICENSE +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +from __future__ import annotations + +import argparse +import asyncio +from pathlib import Path + +from farm_ng.canbus import amiga_v6_pb2 +from farm_ng.canbus.packet import PendantButtons +from farm_ng.canbus.packet import PendantState +from farm_ng.core.event_client import EventClient +from farm_ng.core.event_service_pb2 import EventServiceConfig +from farm_ng.core.events_file_reader import proto_from_json_file + + +async def main(service_config_path: Path) -> None: + """Run the canbus service client. + + Args: + service_config_path (Path): The path to the canbus service config. + """ + + config: EventServiceConfig = proto_from_json_file(service_config_path, EventServiceConfig()) + async for event, msg in EventClient(config).subscribe(config.subscriptions[0], decode=True): + if not isinstance(msg, amiga_v6_pb2.PendantState): + print(f"Unexpected message type: {type(msg)}") + continue + pendant_state: PendantState = PendantState.from_proto(msg) + print(f"Received pendant state: {pendant_state}") + for button in PendantButtons: + if pendant_state.is_button_pressed(button): + print(f"Button {button.name} is pressed") + + +if __name__ == "__main__": + parser = argparse.ArgumentParser(prog="Stream PendantState messages the canbus service.") + parser.add_argument("--service-config", type=Path, required=True, help="The canbus service config.") + args = parser.parse_args() + + asyncio.run(main(args.service_config)) diff --git a/py/examples/pendant_state/requirements.txt b/py/examples/pendant_state/requirements.txt new file mode 100644 index 00000000..7da67160 --- /dev/null +++ b/py/examples/pendant_state/requirements.txt @@ -0,0 +1 @@ +farm-ng-amiga diff --git a/py/examples/pendant_state/service_config.json b/py/examples/pendant_state/service_config.json new file mode 100644 index 00000000..6980373e --- /dev/null +++ b/py/examples/pendant_state/service_config.json @@ -0,0 +1,15 @@ +{ + "name": "canbus", + "port": 6001, + "host": "localhost", + "log_level": "INFO", + "subscriptions": [ + { + "uri": { + "path": "/pendant", + "query": "service_name=canbus" + }, + "every_n": 1 + } + ] +} diff --git a/py/examples/vehicle_twist/main.py b/py/examples/vehicle_twist/main.py index 6c86426f..59078a60 100644 --- a/py/examples/vehicle_twist/main.py +++ b/py/examples/vehicle_twist/main.py @@ -57,10 +57,10 @@ def update_twist_with_key_press(twist: Twist2d, key: int): async def main(service_config_path: Path) -> None: - """Run the camera service client. + """Run the canbus service client. Args: - service_config_path (Path): The path to the camera service config. + service_config_path (Path): The path to the canbus service config. """ # Initialize the command to send twist = Twist2d() @@ -68,7 +68,7 @@ async def main(service_config_path: Path) -> None: # open a window to capture key presses cv2.namedWindow('Virtual Keyboard') - # create a client to the camera service + # create a client to the canbus service config: EventServiceConfig = proto_from_json_file(service_config_path, EventServiceConfig()) client: EventClient = EventClient(config) diff --git a/py/farm_ng/canbus/packet.py b/py/farm_ng/canbus/packet.py index ea5fab8b..a25a08b5 100644 --- a/py/farm_ng/canbus/packet.py +++ b/py/farm_ng/canbus/packet.py @@ -13,6 +13,7 @@ # limitations under the License. from __future__ import annotations +import logging import time from enum import IntEnum from struct import pack @@ -30,6 +31,19 @@ SDK_NODE_ID = 0x2A +class PendantButtons(IntEnum): + """Bit field for pendant buttons.""" + + PAUSE = 0x01 # Square + BRAKE = 0x02 # Circle + PTO = 0x04 # Triangle + CRUISE = 0x08 # Cross (X) + LEFT = 0x10 # D-pad left + UP = 0x20 # D-pad up + RIGHT = 0x40 # D-pad right + DOWN = 0x80 # D-pad down + + class AmigaControlState(IntEnum): """State of the Amiga vehicle control unit (VCU)""" @@ -100,6 +114,9 @@ def make_amiga_rpdo1_proto( Uses the AmigaRpdo1 structure and formatting, that can be sent directly to the canbus service to be formatted and send on the CAN bus. + WARNING: Deprecated starting with farm-ng-amiga v2.3.0 + Please use AmigaRpdo1.to_raw_canbus_message() instead. + Args: state_req: State of the Amiga vehicle control unit (VCU). cmd_speed: Command speed in meters per second. @@ -110,6 +127,9 @@ def make_amiga_rpdo1_proto( Returns: An instance of a canbus_pb2.RawCanbusMessage. """ + logging.warning("make_amiga_rpdo1_proto is deprecated as of v2.3.0") + logging.warning("Use AmigaRpdo1.to_raw_canbus_message() instead.") + # TODO: add some checkers, or make python CHECK_API return canbus_pb2.RawCanbusMessage( id=AmigaRpdo1.cob_id + DASHBOARD_NODE_ID, @@ -164,12 +184,8 @@ def encode(self): def decode(self, data): """Decodes CAN message data and populates the values of the class.""" if len(data) == 5: - # TODO: Instate warning when dashboard fw v0.1.9 is released - # warnings.warn( - # "Please update dashboard firmware to >= v0.1.9." - # " New AmigaTpdo1 packets include more data. Support will be removed in farm_ng_amiga v0.0.9", - # stacklevel=2, - # ) + logging.warning("Please update dashboard firmware to >= v0.1.9 to use updated AmigaRpdo1 packet format.") + (self.state_req, cmd_speed, cmd_ang_rate) = unpack(self.legacy_format, data) self.cmd_speed = cmd_speed / 1000.0 self.cmd_ang_rate = cmd_ang_rate / 1000.0 @@ -183,6 +199,13 @@ def __str__(self): self.state_req, self.cmd_speed, self.cmd_ang_rate ) + " Command PTO bits 0x{:x} Command h-bridge bits 0x{:x}".format(self.pto_bits, self.hbridge_bits) + def to_raw_canbus_message(self) -> canbus_pb2.RawCanbusMessage: + """Packs the class data into a canbus_pb2.RawCanbusMessage. + + Returns: An instance of a canbus_pb2.RawCanbusMessage. + """ + return canbus_pb2.RawCanbusMessage(id=self.cob_id + DASHBOARD_NODE_ID, data=self.encode()) + class AmigaTpdo1(Packet): """State, speed, and angular rate of the Amiga vehicle control unit (VCU). @@ -225,12 +248,8 @@ def encode(self): def decode(self, data): """Decodes CAN message data and populates the values of the class.""" if len(data) == 5: - # TODO: Instate warning when dashboard fw v0.1.9 is released - # warnings.warn( - # "Please update dashboard firmware to >= v0.1.9." - # " New AmigaTpdo1 packets include more data. Support will be removed in farm_ng_amiga v0.0.9", - # stacklevel=2, - # ) + logging.warning("Please update dashboard firmware to >= v0.1.9 to use updated AmigaTpdo1 packet format.") + (self.state, meas_speed, meas_ang_rate) = unpack(self.legacy_format, data) self.meas_speed = meas_speed / 1000.0 self.meas_ang_rate = meas_ang_rate / 1000.0 @@ -274,6 +293,24 @@ def from_proto(cls, proto: amiga_v6_pb2.AmigaTpdo1) -> AmigaTpdo1: obj.hbridge_bits = proto.hbridge_bits return obj + @classmethod + def from_raw_canbus_message(cls, message: canbus_pb2.RawCanbusMessage) -> AmigaTpdo1: + """Parses a canbus_pb2.RawCanbusMessage. + + IFF the message came from the dashboard and contains AmigaTpdo1 structure, + formatting, and cobid. + + Args: + message: The raw canbus message to parse. + + Returns: + The parsed AmigaTpdo1 message. + """ + if message.id != cls.cob_id + DASHBOARD_NODE_ID: + raise ValueError(f"Expected message from dashboard, received message from node {message.id}") + + return cls.from_can_data(message.data, stamp=message.stamp) + def __str__(self): return "AMIGA TPDO1 Amiga state {} Measured speed {:0.3f} Measured angular rate {:0.3f} @ time {}".format( self.state, self.meas_speed, self.meas_ang_rate, self.stamp.stamp @@ -286,12 +323,18 @@ def parse_amiga_tpdo1_proto(message: canbus_pb2.RawCanbusMessage) -> AmigaTpdo1 IFF the message came from the dashboard and contains AmigaTpdo1 structure, formatting, and cobid. + WARNING: Deprecated starting with farm-ng-amiga v2.3.0 + Please use AmigaTpdo1.from_raw_canbus_message() instead. + Args: message: The raw canbus message to parse. Returns: The parsed AmigaTpdo1 message, or None if the message is not a valid AmigaTpdo1 message. """ + logging.warning("parse_amiga_tpdo1_proto is deprecated as of v2.3.0") + logging.warning("Use AmigaTpdo1.from_raw_canbus_message() instead.") + # TODO: add some checkers, or make python CHECK_API if message.id != AmigaTpdo1.cob_id + DASHBOARD_NODE_ID: return None @@ -365,3 +408,82 @@ def __str__(self): self.id, self.status.name, self.rpm, self.voltage, self.current, self.temperature, self.timestamp ) ) + + +class PendantState(Packet): + """State of the Pendant (joystick position & pressed buttons)""" + + scale = 32767 + format = " [left, right] + self.y = y # [-1.0, 1.0] => [reverse, forward] + self.buttons = buttons + self.stamp_packet(time.monotonic()) + + def encode(self): + """Returns the data contained by the class encoded as CAN message data.""" + return pack(self.format, int(self.x * self.scale), int(self.y * self.scale), self.buttons) + + def decode(self, data): + """Decodes CAN message data and populates the values of the class.""" + + (xi, yi, self.buttons) = unpack(self.format, data) + self.x = xi / self.scale + self.y = yi / self.scale + + def to_proto(self) -> amiga_v6_pb2.PendantState: + """Packs the class data into a PendantState proto message. + + Returns: An instance of a PendantState proto. + """ + return amiga_v6_pb2.PendantState( + node_id=PENDANT_NODE_ID, stamp=self.stamp.stamp, x=self.x, y=self.y, buttons=self.buttons + ) + + @classmethod + def from_proto(cls, proto: amiga_v6_pb2.PendantState) -> PendantState: + """Creates an instance of the class from a proto message. + + Args: + proto: The PendantState proto message to parse. + """ + # Check for correct proto + if not isinstance(proto, amiga_v6_pb2.PendantState): + raise TypeError(f"Expected amiga_v6_pb2.PendantState proto, received {type(proto)}") + + obj = cls() + obj.stamp_packet(proto.stamp) + obj.x = proto.x + obj.y = proto.y + obj.buttons = proto.buttons + return obj + + @classmethod + def from_raw_canbus_message(cls, message: canbus_pb2.RawCanbusMessage) -> PendantState: + """Parses a canbus_pb2.RawCanbusMessage. + + IFF the message came from the pendant and contains PendantState structure, + formatting, and cobid. + + Args: + message: The raw canbus message to parse. + + Returns: + The parsed PendantState message. + """ + if message.id != cls.cob_id + PENDANT_NODE_ID: + raise ValueError(f"Expected message from pendant, received message from node {message.id}") + + return cls.from_can_data(message.data, stamp=message.stamp) + + def is_button_pressed(self, button: PendantButtons) -> bool: + """Returns True if the button is pressed.""" + if not isinstance(button, PendantButtons): + raise TypeError(f"Expected PendantButtons, received {type(button)}") + return bool(self.buttons & button) + + def __str__(self): + return "x {:0.3f} y {:0.3f} buttons {}".format(self.x, self.y, self.buttons) diff --git a/py/tests/test_packet.py b/py/tests/test_packet.py new file mode 100644 index 00000000..ca5c1f6d --- /dev/null +++ b/py/tests/test_packet.py @@ -0,0 +1,115 @@ +# Copyright (c) farm-ng, inc. +# +# Licensed under the Amiga Development Kit License (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# https://github.com/farm-ng/amiga-dev-kit/blob/main/LICENSE +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +import time + +import pytest +from farm_ng.canbus import amiga_v6_pb2 +from farm_ng.canbus import canbus_pb2 +from farm_ng.canbus.packet import AmigaControlState +from farm_ng.canbus.packet import AmigaRpdo1 +from farm_ng.canbus.packet import AmigaTpdo1 +from farm_ng.canbus.packet import MotorControllerStatus +from farm_ng.canbus.packet import MotorState +from farm_ng.canbus.packet import PendantButtons +from farm_ng.canbus.packet import PendantState + + +@pytest.fixture +def amiga_rpdo1_instance(): + return AmigaRpdo1( + state_req=AmigaControlState.STATE_AUTO_ACTIVE, cmd_speed=2.0, cmd_ang_rate=1.0, pto_bits=0x0A, hbridge_bits=0x05 + ) + + +@pytest.fixture +def amiga_tpdo1_instance(): + return AmigaTpdo1( + state=AmigaControlState.STATE_AUTO_READY, meas_speed=3.0, meas_ang_rate=1.5, pto_bits=0x01, hbridge_bits=0x02 + ) + + +@pytest.fixture +def pendant_state_instance(): + return PendantState(x=0.5, y=-0.5, buttons=PendantButtons.PAUSE | PendantButtons.BRAKE) + + +@pytest.fixture +def motor_state_instance(): + return MotorState( + id=1, + status=MotorControllerStatus.RUN, + rpm=1000, + voltage=24.0, + current=1.5, + temperature=25, + timestamp=time.monotonic(), + ) + + +def test_amiga_rpdo1_encode_decode(amiga_rpdo1_instance): + encoded = amiga_rpdo1_instance.encode() + decoded_instance = AmigaRpdo1() + decoded_instance.decode(encoded) + + assert amiga_rpdo1_instance.state_req == decoded_instance.state_req + assert amiga_rpdo1_instance.cmd_speed == decoded_instance.cmd_speed + assert amiga_rpdo1_instance.cmd_ang_rate == decoded_instance.cmd_ang_rate + assert amiga_rpdo1_instance.pto_bits == decoded_instance.pto_bits + assert amiga_rpdo1_instance.hbridge_bits == decoded_instance.hbridge_bits + + +def test_amiga_tpdo1_to_from_proto(amiga_tpdo1_instance): + proto = amiga_tpdo1_instance.to_proto() + assert isinstance(proto, amiga_v6_pb2.AmigaTpdo1) + + from_proto_instance = AmigaTpdo1.from_proto(proto) + assert from_proto_instance.state == amiga_tpdo1_instance.state + assert from_proto_instance.meas_speed == amiga_tpdo1_instance.meas_speed + assert from_proto_instance.meas_ang_rate == amiga_tpdo1_instance.meas_ang_rate + assert from_proto_instance.pto_bits == amiga_tpdo1_instance.pto_bits + assert from_proto_instance.hbridge_bits == amiga_tpdo1_instance.hbridge_bits + + +def test_pendant_state_encode_decode(pendant_state_instance): + encoded = pendant_state_instance.encode() + decoded_instance = PendantState() + decoded_instance.decode(encoded) + + # Approx for floating point comparison after float -> int -> float conversion + assert pendant_state_instance.x == pytest.approx(decoded_instance.x, rel=1e-3) + assert pendant_state_instance.y == pytest.approx(decoded_instance.y, rel=1e-3) + assert pendant_state_instance.buttons == decoded_instance.buttons + + # Test the is_button_pressed method + assert pendant_state_instance.is_button_pressed(PendantButtons.PAUSE) + assert pendant_state_instance.is_button_pressed(PendantButtons.BRAKE) + assert not pendant_state_instance.is_button_pressed(PendantButtons.CRUISE) + assert not pendant_state_instance.is_button_pressed(PendantButtons.LEFT) + + +def test_motor_state_to_from_proto(motor_state_instance): + proto = motor_state_instance.to_proto() + assert isinstance(proto, canbus_pb2.MotorState) + + from_proto_instance = MotorState.from_proto(proto) + assert from_proto_instance.id == motor_state_instance.id + assert from_proto_instance.status == motor_state_instance.status + assert from_proto_instance.rpm == motor_state_instance.rpm + assert from_proto_instance.voltage == motor_state_instance.voltage + assert from_proto_instance.current == motor_state_instance.current + assert from_proto_instance.temperature == motor_state_instance.temperature + + +if __name__ == "__main__": + pytest.main()