Skip to content

Commit

Permalink
Add PendantState python class, proto, & example (#182)
Browse files Browse the repository at this point in the history
  • Loading branch information
Hackerman342 authored Feb 21, 2024
1 parent bb69591 commit 5b87ca6
Show file tree
Hide file tree
Showing 9 changed files with 338 additions and 19 deletions.
12 changes: 12 additions & 0 deletions protos/farm_ng/canbus/amiga_v6.proto
Original file line number Diff line number Diff line change
Expand Up @@ -76,3 +76,15 @@ message AmigaV6CanbusState {
double battery_charge_level = 3;
bool last_send_error = 4; // True if the last send on the canbus failed
}

message PendantState {
/*
State of the wired Pendant connected to the Amiga.
*/
uint32 node_id = 1; // Node ID of sender
double stamp = 2; // Received time, in host monotonic clock (seconds)

double x = 3; // Joystick X axis
double y = 4; // Joystick Y axis
uint32 buttons = 5; // Buttons pressed, bit masked. See PendantButtons in packet.py
}
7 changes: 3 additions & 4 deletions py/examples/file_reader_can/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,11 +14,10 @@
from __future__ import annotations

import argparse
from typing import Optional

from farm_ng.canbus import canbus_pb2
from farm_ng.canbus.packet import AmigaTpdo1
from farm_ng.canbus.packet import parse_amiga_tpdo1_proto
from farm_ng.canbus.packet import DASHBOARD_NODE_ID
from farm_ng.core.events_file_reader import build_events_dict
from farm_ng.core.events_file_reader import EventLogPosition
from farm_ng.core.events_file_reader import EventsFileReader
Expand Down Expand Up @@ -47,8 +46,8 @@ def main(file_name: str) -> None:

msg: canbus_pb2.RawCanbusMessage
for msg in sample.messages:
tpdo1: Optional[AmigaTpdo1] = parse_amiga_tpdo1_proto(msg)
if tpdo1 is not None:
if msg.id == AmigaTpdo1.cob_id + DASHBOARD_NODE_ID:
tpdo1: AmigaTpdo1 = AmigaTpdo1.from_raw_canbus_message(msg)
print(tpdo1)

assert reader.close()
Expand Down
3 changes: 3 additions & 0 deletions py/examples/pendant_state/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
# Amiga Pendant State example

URL: https://amiga.farm-ng.com/docs/examples/pendant_state
52 changes: 52 additions & 0 deletions py/examples/pendant_state/main.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
# Copyright (c) farm-ng, inc.
#
# Licensed under the Amiga Development Kit License (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# https://github.com/farm-ng/amiga-dev-kit/blob/main/LICENSE
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from __future__ import annotations

import argparse
import asyncio
from pathlib import Path

from farm_ng.canbus import amiga_v6_pb2
from farm_ng.canbus.packet import PendantButtons
from farm_ng.canbus.packet import PendantState
from farm_ng.core.event_client import EventClient
from farm_ng.core.event_service_pb2 import EventServiceConfig
from farm_ng.core.events_file_reader import proto_from_json_file


async def main(service_config_path: Path) -> None:
"""Run the canbus service client.
Args:
service_config_path (Path): The path to the canbus service config.
"""

config: EventServiceConfig = proto_from_json_file(service_config_path, EventServiceConfig())
async for event, msg in EventClient(config).subscribe(config.subscriptions[0], decode=True):
if not isinstance(msg, amiga_v6_pb2.PendantState):
print(f"Unexpected message type: {type(msg)}")
continue
pendant_state: PendantState = PendantState.from_proto(msg)
print(f"Received pendant state: {pendant_state}")
for button in PendantButtons:
if pendant_state.is_button_pressed(button):
print(f"Button {button.name} is pressed")


if __name__ == "__main__":
parser = argparse.ArgumentParser(prog="Stream PendantState messages the canbus service.")
parser.add_argument("--service-config", type=Path, required=True, help="The canbus service config.")
args = parser.parse_args()

asyncio.run(main(args.service_config))
1 change: 1 addition & 0 deletions py/examples/pendant_state/requirements.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
farm-ng-amiga
15 changes: 15 additions & 0 deletions py/examples/pendant_state/service_config.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
{
"name": "canbus",
"port": 6001,
"host": "localhost",
"log_level": "INFO",
"subscriptions": [
{
"uri": {
"path": "/pendant",
"query": "service_name=canbus"
},
"every_n": 1
}
]
}
6 changes: 3 additions & 3 deletions py/examples/vehicle_twist/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -57,18 +57,18 @@ def update_twist_with_key_press(twist: Twist2d, key: int):


async def main(service_config_path: Path) -> None:
"""Run the camera service client.
"""Run the canbus service client.
Args:
service_config_path (Path): The path to the camera service config.
service_config_path (Path): The path to the canbus service config.
"""
# Initialize the command to send
twist = Twist2d()

# open a window to capture key presses
cv2.namedWindow('Virtual Keyboard')

# create a client to the camera service
# create a client to the canbus service
config: EventServiceConfig = proto_from_json_file(service_config_path, EventServiceConfig())
client: EventClient = EventClient(config)

Expand Down
146 changes: 134 additions & 12 deletions py/farm_ng/canbus/packet.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
# limitations under the License.
from __future__ import annotations

import logging
import time
from enum import IntEnum
from struct import pack
Expand All @@ -30,6 +31,19 @@
SDK_NODE_ID = 0x2A


class PendantButtons(IntEnum):
"""Bit field for pendant buttons."""

PAUSE = 0x01 # Square
BRAKE = 0x02 # Circle
PTO = 0x04 # Triangle
CRUISE = 0x08 # Cross (X)
LEFT = 0x10 # D-pad left
UP = 0x20 # D-pad up
RIGHT = 0x40 # D-pad right
DOWN = 0x80 # D-pad down


class AmigaControlState(IntEnum):
"""State of the Amiga vehicle control unit (VCU)"""

Expand Down Expand Up @@ -100,6 +114,9 @@ def make_amiga_rpdo1_proto(
Uses the AmigaRpdo1 structure and formatting, that can be sent
directly to the canbus service to be formatted and send on the CAN bus.
WARNING: Deprecated starting with farm-ng-amiga v2.3.0
Please use AmigaRpdo1.to_raw_canbus_message() instead.
Args:
state_req: State of the Amiga vehicle control unit (VCU).
cmd_speed: Command speed in meters per second.
Expand All @@ -110,6 +127,9 @@ def make_amiga_rpdo1_proto(
Returns:
An instance of a canbus_pb2.RawCanbusMessage.
"""
logging.warning("make_amiga_rpdo1_proto is deprecated as of v2.3.0")
logging.warning("Use AmigaRpdo1.to_raw_canbus_message() instead.")

# TODO: add some checkers, or make python CHECK_API
return canbus_pb2.RawCanbusMessage(
id=AmigaRpdo1.cob_id + DASHBOARD_NODE_ID,
Expand Down Expand Up @@ -164,12 +184,8 @@ def encode(self):
def decode(self, data):
"""Decodes CAN message data and populates the values of the class."""
if len(data) == 5:
# TODO: Instate warning when dashboard fw v0.1.9 is released
# warnings.warn(
# "Please update dashboard firmware to >= v0.1.9."
# " New AmigaTpdo1 packets include more data. Support will be removed in farm_ng_amiga v0.0.9",
# stacklevel=2,
# )
logging.warning("Please update dashboard firmware to >= v0.1.9 to use updated AmigaRpdo1 packet format.")

(self.state_req, cmd_speed, cmd_ang_rate) = unpack(self.legacy_format, data)
self.cmd_speed = cmd_speed / 1000.0
self.cmd_ang_rate = cmd_ang_rate / 1000.0
Expand All @@ -183,6 +199,13 @@ def __str__(self):
self.state_req, self.cmd_speed, self.cmd_ang_rate
) + " Command PTO bits 0x{:x} Command h-bridge bits 0x{:x}".format(self.pto_bits, self.hbridge_bits)

def to_raw_canbus_message(self) -> canbus_pb2.RawCanbusMessage:
"""Packs the class data into a canbus_pb2.RawCanbusMessage.
Returns: An instance of a canbus_pb2.RawCanbusMessage.
"""
return canbus_pb2.RawCanbusMessage(id=self.cob_id + DASHBOARD_NODE_ID, data=self.encode())


class AmigaTpdo1(Packet):
"""State, speed, and angular rate of the Amiga vehicle control unit (VCU).
Expand Down Expand Up @@ -225,12 +248,8 @@ def encode(self):
def decode(self, data):
"""Decodes CAN message data and populates the values of the class."""
if len(data) == 5:
# TODO: Instate warning when dashboard fw v0.1.9 is released
# warnings.warn(
# "Please update dashboard firmware to >= v0.1.9."
# " New AmigaTpdo1 packets include more data. Support will be removed in farm_ng_amiga v0.0.9",
# stacklevel=2,
# )
logging.warning("Please update dashboard firmware to >= v0.1.9 to use updated AmigaTpdo1 packet format.")

(self.state, meas_speed, meas_ang_rate) = unpack(self.legacy_format, data)
self.meas_speed = meas_speed / 1000.0
self.meas_ang_rate = meas_ang_rate / 1000.0
Expand Down Expand Up @@ -274,6 +293,24 @@ def from_proto(cls, proto: amiga_v6_pb2.AmigaTpdo1) -> AmigaTpdo1:
obj.hbridge_bits = proto.hbridge_bits
return obj

@classmethod
def from_raw_canbus_message(cls, message: canbus_pb2.RawCanbusMessage) -> AmigaTpdo1:
"""Parses a canbus_pb2.RawCanbusMessage.
IFF the message came from the dashboard and contains AmigaTpdo1 structure,
formatting, and cobid.
Args:
message: The raw canbus message to parse.
Returns:
The parsed AmigaTpdo1 message.
"""
if message.id != cls.cob_id + DASHBOARD_NODE_ID:
raise ValueError(f"Expected message from dashboard, received message from node {message.id}")

return cls.from_can_data(message.data, stamp=message.stamp)

def __str__(self):
return "AMIGA TPDO1 Amiga state {} Measured speed {:0.3f} Measured angular rate {:0.3f} @ time {}".format(
self.state, self.meas_speed, self.meas_ang_rate, self.stamp.stamp
Expand All @@ -286,12 +323,18 @@ def parse_amiga_tpdo1_proto(message: canbus_pb2.RawCanbusMessage) -> AmigaTpdo1
IFF the message came from the dashboard and contains AmigaTpdo1 structure,
formatting, and cobid.
WARNING: Deprecated starting with farm-ng-amiga v2.3.0
Please use AmigaTpdo1.from_raw_canbus_message() instead.
Args:
message: The raw canbus message to parse.
Returns:
The parsed AmigaTpdo1 message, or None if the message is not a valid AmigaTpdo1 message.
"""
logging.warning("parse_amiga_tpdo1_proto is deprecated as of v2.3.0")
logging.warning("Use AmigaTpdo1.from_raw_canbus_message() instead.")

# TODO: add some checkers, or make python CHECK_API
if message.id != AmigaTpdo1.cob_id + DASHBOARD_NODE_ID:
return None
Expand Down Expand Up @@ -365,3 +408,82 @@ def __str__(self):
self.id, self.status.name, self.rpm, self.voltage, self.current, self.temperature, self.timestamp
)
)


class PendantState(Packet):
"""State of the Pendant (joystick position & pressed buttons)"""

scale = 32767
format = "<hhI"
cob_id = 0x180

def __init__(self, x=0, y=0, buttons=0):
self.x = x # [-1.0, 1.0] => [left, right]
self.y = y # [-1.0, 1.0] => [reverse, forward]
self.buttons = buttons
self.stamp_packet(time.monotonic())

def encode(self):
"""Returns the data contained by the class encoded as CAN message data."""
return pack(self.format, int(self.x * self.scale), int(self.y * self.scale), self.buttons)

def decode(self, data):
"""Decodes CAN message data and populates the values of the class."""

(xi, yi, self.buttons) = unpack(self.format, data)
self.x = xi / self.scale
self.y = yi / self.scale

def to_proto(self) -> amiga_v6_pb2.PendantState:
"""Packs the class data into a PendantState proto message.
Returns: An instance of a PendantState proto.
"""
return amiga_v6_pb2.PendantState(
node_id=PENDANT_NODE_ID, stamp=self.stamp.stamp, x=self.x, y=self.y, buttons=self.buttons
)

@classmethod
def from_proto(cls, proto: amiga_v6_pb2.PendantState) -> PendantState:
"""Creates an instance of the class from a proto message.
Args:
proto: The PendantState proto message to parse.
"""
# Check for correct proto
if not isinstance(proto, amiga_v6_pb2.PendantState):
raise TypeError(f"Expected amiga_v6_pb2.PendantState proto, received {type(proto)}")

obj = cls()
obj.stamp_packet(proto.stamp)
obj.x = proto.x
obj.y = proto.y
obj.buttons = proto.buttons
return obj

@classmethod
def from_raw_canbus_message(cls, message: canbus_pb2.RawCanbusMessage) -> PendantState:
"""Parses a canbus_pb2.RawCanbusMessage.
IFF the message came from the pendant and contains PendantState structure,
formatting, and cobid.
Args:
message: The raw canbus message to parse.
Returns:
The parsed PendantState message.
"""
if message.id != cls.cob_id + PENDANT_NODE_ID:
raise ValueError(f"Expected message from pendant, received message from node {message.id}")

return cls.from_can_data(message.data, stamp=message.stamp)

def is_button_pressed(self, button: PendantButtons) -> bool:
"""Returns True if the button is pressed."""
if not isinstance(button, PendantButtons):
raise TypeError(f"Expected PendantButtons, received {type(button)}")
return bool(self.buttons & button)

def __str__(self):
return "x {:0.3f} y {:0.3f} buttons {}".format(self.x, self.y, self.buttons)
Loading

0 comments on commit 5b87ca6

Please sign in to comment.