Skip to content

Commit

Permalink
Set up foundation for subjugator_missions migration
Browse files Browse the repository at this point in the history
  • Loading branch information
DaniParr committed Jan 8, 2025
1 parent 98fdcf3 commit eae714e
Show file tree
Hide file tree
Showing 8 changed files with 1,166 additions and 0 deletions.
20 changes: 20 additions & 0 deletions src/subjugator/command/subjugator_missions/package.xml
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
<?xml version="1.0"?>
<?xml-model href="http://download.ros.org/schema/package_format3.xsd" schematypens="http://www.w3.org/2001/XMLSchema"?>
<package format="3">
<name>subjugator_missions</name>
<version>0.0.0</version>
<description>Holds all of subjugator's missions from 2025 and up</description>
<maintainer email="daniel27parra@gmail.com">Daniel Parra</maintainer>
<license>TODO: License declaration</license>

<depend>rclpy</depend>
<depend>std_msgs</depend>
<depend>gazebo_msgs</depend>

<test_depend>ament_copyright</test_depend>
<test_depend>python3-pytest</test_depend>

<export>
<build_type>ament_python</build_type>
</export>
</package>
Empty file.
4 changes: 4 additions & 0 deletions src/subjugator/command/subjugator_missions/setup.cfg
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
[develop]
script_dir=$base/lib/subjugator_missions
[install]
install_scripts=$base/lib/subjugator_missions
26 changes: 26 additions & 0 deletions src/subjugator/command/subjugator_missions/setup.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
from setuptools import find_packages, setup

package_name = 'subjugator_missions'

setup(
name=package_name,
version='0.0.0',
packages=find_packages(exclude=['test']),
data_files=[
('share/ament_index/resource_index/packages',
['resource/' + package_name]),
('share/' + package_name, ['package.xml']),
],
install_requires=['setuptools'],
zip_safe=True,
maintainer='daniel',
maintainer_email='daniel27parra@gmail.com',
description='TODO: Package description',
license='TODO: License declaration',
tests_require=['pytest'],
entry_points={
'console_scripts': [
'move = subjugator_missions.move:main'
],
},
)
Empty file.
264 changes: 264 additions & 0 deletions src/subjugator/command/subjugator_missions/subjugator_missions/move.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,264 @@
#!/usr/bin/env python3
import os

import rospkg
import yaml
from gazebo_msgs.msg import ModelState, ModelStates
from geometry_msgs.msg import Point, Pose, Quaternion, Twist, Vector3
from mil_misc_tools import ArgumentParserException, ThrowingArgumentParser
from ros_alarms import TxAlarmBroadcaster

from .sub_singleton import SubjuGatorMission

UNITS = {"m": 1, "ft": 0.3048, "yard": 0.9144, "rad": 1, "deg": 0.0174533}
SHORTHAND = {
"f": "strafe_forward",
"b": "strafe_backward",
"l": "strafe_left",
"r": "strafe_right",
"rf": "forward",
"rb": "backward",
"rl": "left",
"rr": "right",
"yl": "yaw_left",
"yr": "yaw_right",
"pu": "pitch_up",
"pd": "pitch_down",
"d": "down",
"u": "up",
}


class Move(SubjuGatorMission):
@classmethod
def decode_parameters(cls, parameters):
argv = parameters.split()
try:
return cls.parser.parse_args(argv)
except ArgumentParserException as e:
if e.message.startswith("unrecognized arguments"):
print(
'Note: If you are attempting to teleport to negative x value, use -- to separate tp arguments from actual command arguments. (ie, "submove tp -- -1,-2,3")',
)
raise e

@classmethod
async def setup(cls):
parser = ThrowingArgumentParser(
description="Command Line Mission Runner",
usage="Pass any pose editor commands with an arguments. \n\t\
forward 1 (moves forward 1 meter) \n\t\
backward 2ft (moves backward 2 feet)\
\n\t forward 1 backward 2ft\
(moves forward 1 meter then from there moves backward 2 feet), ",
)
parser.add_argument(
"commands",
type=str,
nargs="*",
help="Pose editor commands to run each followed by an argument to pass to the command\
(distance or angle usually). \
Optionally a unit can be added if a non-standard unit is desired.",
)
parser.add_argument(
"-s",
"--speed",
type=float,
default=0.2,
help="How fast to execute the move. (m/s)",
)
parser.add_argument(
"-z",
"--zrp",
action="store_true",
help="Make end goal have zero roll and pitch.",
)
parser.add_argument(
"-b",
"--blind",
action="store_true",
help="Do not check if waypoint is safe.",
)
cls.parser = parser

async def run(self, args):
commands = args.commands
# Split into commands and arguments, every other index
arguments = commands[1::2]
commands = commands[0::2]
for i in range(len(commands)):
command = commands[i]
argument = arguments[i]
self.send_feedback("Waiting for odom...")
await self.tx_pose()
self.send_feedback("Odom found!")
action_kwargs = {"speed": float(args.speed), "blind": bool(args.blind)}

if command == "custom":
# Let the user input custom commands, the eval may be dangerous
# so do away with that at some point.
self.send_feedback(f"Moving with the command: {argument}")
res = await eval(f"self.move.{argument}.go()")

elif command in ["tp", "teleport"]:
try:
rospack = rospkg.RosPack()
config_file = os.path.join(
rospack.get_path("subjugator_gazebo"),
"config",
"teleport_locs.yaml",
)
f = yaml.safe_load(open(config_file))
if len(arguments) > 1:
# Command only takes in one string so to prevent this
# command from flowing over into movement we break
# before it proceeds.
self.send_feedback("Error, more than one argument detected.")
break
else:
try:
x = float(argument.split(",")[0])
y = float(argument.split(",")[1])
z = float(argument.split(",")[2])
# Assumption is if we make it this far, we have successfully
# bound the previous three coordinates.
# The below would fail if we entered a location name instead of coords
# but we should have caught by this point.
# This is to catch anything over 3 coordinates. If
# only two were given then we would also error out
# above.
if len(argument.split(",")) != 3:
self.send_feedback("Incorrect number of coordinates")
break
except IndexError:
self.send_feedback("Incorrect number of coordinates")
break
except ValueError:
try:
if argument in ("list", "l"):
self.send_feedback("Available TP locations:")
for k in f:
print(" * " + k)
break
argz = f[argument]
x = float(argz.split(" ")[0])
y = float(argz.split(" ")[1])
z = float(argz.split(" ")[2])
except LookupError:
# This means we did not find the saved location
# referenced by the argument.
self.send_feedback(
"TP location not found, check input.",
)
break

# Get whether to teleport sub8, sub9, or both
sub = self.nh.subscribe("/gazebo/model_states", ModelStates)
models = []
async with sub:
msg: ModelStates = await sub.get_next_message()
models = [
name for name in msg.name if name.startswith("sub")
]
# Sometimes you need to sleep in order to get the thing to publish
# Apparently there is a latency when you set a publisher and it needs to actually hook into it.
# As an additional note, given the way we do trajectory in the sim, we must kill sub to prevent
# the trajectory from overriding our teleport and
# bringing it back to its expected position.
ab = TxAlarmBroadcaster(self.nh, "kill", node_name="kill")
await ab.raise_alarm(
problem_description="TELEPORTING: KILLING SUB",
severity=5,
)
await self.nh.sleep(1)
state_set_pub = self.nh.advertise(
"/gazebo/set_model_state",
ModelState,
)
async with state_set_pub:
# Wait for Gazebo to listen for message
while "/gazebo" not in state_set_pub.get_connections():
await self.nh.sleep(0.1)
for model in models:
self.send_feedback(
f"Teleporting {model} to ({x}, {y}, {z})",
)
modelstate = ModelState(
model_name=model,
pose=Pose(
position=Point(x, y, z),
orientation=Quaternion(0, 0, 0, 1),
),
twist=Twist(
linear=Vector3(0, 0, 0),
angular=Vector3(0, 0, 0),
),
reference_frame="world",
)
state_set_pub.publish(modelstate)
await self.nh.sleep(1)
await ab.clear_alarm()

except KeyboardInterrupt:
# Catches a ctrl-c situation and ends the program. Break
# will just bring the program to its natural conclusion.
break

elif command in ["zrp", "level_off", "zpr"]:
self.send_feedback("Zeroing roll and pitch")
res = await self.move.zero_roll_and_pitch().go(**action_kwargs)

elif command == "stop":
self.send_feedback("Stopping...")
if args.zrp:
res = (
await self.move.forward(0)
.zero_roll_and_pitch()
.go(**action_kwargs)
)
else:
res = await self.move.forward(0).go(**action_kwargs)

else:
# Get the command from shorthand if it's there
command = SHORTHAND.get(command, command)
movement = getattr(self.move(), command)

trans_move = (
command[:3] != "yaw"
and command[:5] != "pitch"
and command[:4] != "roll"
)
unit = "m" if trans_move else "rad"

amount = argument
# See if there's a non standard unit at the end of the argument
if not argument[-1].isdigit():
last_digit_index = [
i for i, c in enumerate(argument) if c.isdigit()
][-1] + 1
amount = argument[:last_digit_index]
unit = argument[last_digit_index:]

extra = "and zeroing pitch and roll" if args.zrp else ""
self.send_feedback(f"{command}ing {amount}{unit} {extra}")
bad_unit = UNITS.get(unit, "BAD")
if bad_unit == "BAD":
self.send_feedback("BAD UNIT")
break
goal = movement(float(amount) * UNITS.get(unit, 1))
if args.zrp:
res = await self.go(goal.zero_roll_and_pitch(), **action_kwargs)
else:
res = await self.go(goal, **action_kwargs)
self.send_feedback(
f"Result of {command}ing: {res.error or 'No error (successful).'}",
)
return "All movements complete."

def main():
print('Hi from subjugator_missions.')


if __name__ == '__main__':
main()
Loading

0 comments on commit eae714e

Please sign in to comment.