Skip to content

Commit

Permalink
Removed configuration and port finding code from classes no longer ne…
Browse files Browse the repository at this point in the history
…cessary.
  • Loading branch information
jess-moss committed Sep 17, 2024
1 parent 1c1882e commit 9c6ebce
Show file tree
Hide file tree
Showing 6 changed files with 62 additions and 298 deletions.
146 changes: 0 additions & 146 deletions lerobot/common/robot_devices/motors/dynamixel.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@
import time
import traceback
from copy import deepcopy
from pathlib import Path

import numpy as np
import tqdm
Expand Down Expand Up @@ -235,35 +234,6 @@ def assert_same_address(model_ctrl_table, motor_models, data_name):
)


def find_available_ports():
ports = []
for path in Path("/dev").glob("tty*"):
ports.append(str(path))
return ports


def find_port():
print("Finding all available ports for the DynamixelMotorsBus.")
ports_before = find_available_ports()
print(ports_before)

print("Remove the usb cable from your DynamixelMotorsBus and press Enter when done.")
input()

time.sleep(0.5)
ports_after = find_available_ports()
ports_diff = list(set(ports_before) - set(ports_after))

if len(ports_diff) == 1:
port = ports_diff[0]
print(f"The port of this DynamixelMotorsBus is '{port}'")
print("Reconnect the usb cable.")
elif len(ports_diff) == 0:
raise OSError(f"Could not detect the port. No difference was found ({ports_diff}).")
else:
raise OSError(f"Could not detect the port. More than one port was found ({ports_diff}).")


class TorqueMode(enum.Enum):
ENABLED = 1
DISABLED = 0
Expand Down Expand Up @@ -377,11 +347,6 @@ def connect(self):

self.port_handler.setPacketTimeoutMillis(TIMEOUT_MS)

if not self.are_motors_configured():
raise OSError(
"Motors are not configured. Please use configure_motors.py to configure motors before continuing."
)

def reconnect(self):
self.port_handler = PortHandler(self.port)
self.packet_handler = PacketHandler(PROTOCOL_VERSION)
Expand All @@ -398,112 +363,6 @@ def are_motors_configured(self):
print(e)
return False

def configure_motors(self):
# TODO(rcadene): This script assumes motors follow the X_SERIES baudrates
# TODO(rcadene): Refactor this function with intermediate high-level functions

print("Scanning all baudrates and motor indices")
all_baudrates = set(X_SERIES_BAUDRATE_TABLE.values())
ids_per_baudrate = {}
for baudrate in all_baudrates:
self.set_bus_baudrate(baudrate)
present_ids = self.find_motor_indices()
if len(present_ids) > 0:
ids_per_baudrate[baudrate] = present_ids
print(f"Motor indices detected: {ids_per_baudrate}")
print()

possible_baudrates = list(ids_per_baudrate.keys())
possible_ids = list({idx for sublist in ids_per_baudrate.values() for idx in sublist})
untaken_ids = list(set(range(MAX_ID_RANGE)) - set(possible_ids) - set(self.motor_indices))

# Connect successively one motor to the chain and write a unique random index for each
for i in range(len(self.motors)):
self.disconnect()
input(
"1. Unplug the power cord\n"
"2. Plug/unplug minimal number of cables to only have the first "
f"{i+1} motor(s) ({self.motor_names[:i+1]}) connected.\n"
"3. Re-plug the power cord\n"
"Press Enter to continue..."
)
print()
self.reconnect()

if i > 0:
try:
self.read_with_motor_ids(self.motor_models, untaken_ids[:i], "ID")
except ConnectionError:
print(f"Failed to read from {untaken_ids[:i+1]}. Make sure the power cord is plugged in.")
input("Press Enter to continue...")
print()
self.reconnect()

print("Scanning possible baudrates and motor indices")
motor_found = False
for baudrate in possible_baudrates:
self.set_bus_baudrate(baudrate)
present_ids = self.find_motor_indices(possible_ids)
if len(present_ids) == 1:
present_idx = present_ids[0]
print(f"Detected motor with index {present_idx}")

if baudrate != BAUDRATE:
print(f"Setting its baudrate to {BAUDRATE}")
baudrate_idx = list(X_SERIES_BAUDRATE_TABLE.values()).index(BAUDRATE)

# The write can fail, so we allow retries
for _ in range(NUM_WRITE_RETRY):
self.write_with_motor_ids(
self.motor_models, present_idx, "Baud_Rate", baudrate_idx
)
time.sleep(0.5)
self.set_bus_baudrate(BAUDRATE)
try:
present_baudrate_idx = self.read_with_motor_ids(
self.motor_models, present_idx, "Baud_Rate"
)
except ConnectionError:
print("Failed to write baudrate. Retrying.")
self.set_bus_baudrate(baudrate)
continue
break
else:
raise

if present_baudrate_idx != baudrate_idx:
raise OSError("Failed to write baudrate.")

print(f"Setting its index to a temporary untaken index ({untaken_ids[i]})")
self.write_with_motor_ids(self.motor_models, present_idx, "ID", untaken_ids[i])

present_idx = self.read_with_motor_ids(self.motor_models, untaken_ids[i], "ID")
if present_idx != untaken_ids[i]:
raise OSError("Failed to write index.")

motor_found = True
break
elif len(present_ids) > 1:
raise OSError(f"More than one motor detected ({present_ids}), but only one was expected.")

if not motor_found:
raise OSError(
"No motor found, but one new motor expected. Verify power cord is plugged in and retry."
)
print()

print(f"Setting expected motor indices: {self.motor_indices}")
self.set_bus_baudrate(BAUDRATE)
self.write_with_motor_ids(
self.motor_models, untaken_ids[: len(self.motors)], "ID", self.motor_indices
)
print()

if (self.read("ID") != self.motor_indices).any():
raise OSError("Failed to write motors indices.")

print("Configuration is done!")

def find_motor_indices(self, possible_ids=None):
if possible_ids is None:
possible_ids = range(MAX_ID_RANGE)
Expand Down Expand Up @@ -968,8 +827,3 @@ def disconnect(self):
def __del__(self):
if getattr(self, "is_connected", False):
self.disconnect()


if __name__ == "__main__":
# Helper to find the usb port associated to all your DynamixelMotorsBus.
find_port()
146 changes: 0 additions & 146 deletions lerobot/common/robot_devices/motors/feetech.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@
import time
import traceback
from copy import deepcopy
from pathlib import Path

import numpy as np
import tqdm
Expand Down Expand Up @@ -208,35 +207,6 @@ def assert_same_address(model_ctrl_table, motor_models, data_name):
)


def find_available_ports():
ports = []
for path in Path("/dev").glob("tty*"):
ports.append(str(path))
return ports


def find_port():
print("Finding all available ports for the FeetechMotorsBus.")
ports_before = find_available_ports()
print(ports_before)

print("Remove the usb cable from your FeetechMotorsBus and press Enter when done.")
input()

time.sleep(0.5)
ports_after = find_available_ports()
ports_diff = list(set(ports_before) - set(ports_after))

if len(ports_diff) == 1:
port = ports_diff[0]
print(f"The port of this FeetechMotorsBus is '{port}'")
print("Reconnect the usb cable.")
elif len(ports_diff) == 0:
raise OSError(f"Could not detect the port. No difference was found ({ports_diff}).")
else:
raise OSError(f"Could not detect the port. More than one port was found ({ports_diff}).")


class TorqueMode(enum.Enum):
ENABLED = 1
DISABLED = 0
Expand Down Expand Up @@ -350,11 +320,6 @@ def connect(self):

self.port_handler.setPacketTimeoutMillis(TIMEOUT_MS)

if not self.are_motors_configured():
raise OSError(
"Motors are not configured. Please use configure_motors.py to configure motors before continuing."
)

def reconnect(self):
self.port_handler = PortHandler(self.port)
self.packet_handler = PacketHandler(PROTOCOL_VERSION)
Expand All @@ -371,112 +336,6 @@ def are_motors_configured(self):
print(e)
return False

def configure_motors(self):
# TODO(rcadene): This script assumes motors follow the X_SERIES baudrates
# TODO(rcadene): Refactor this function with intermediate high-level functions

print("Scanning all baudrates and motor indices")
all_baudrates = set(SCS_SERIES_BAUDRATE_TABLE.values())
ids_per_baudrate = {}
for baudrate in all_baudrates:
self.set_bus_baudrate(baudrate)
present_ids = self.find_motor_indices()
if len(present_ids) > 0:
ids_per_baudrate[baudrate] = present_ids
print(f"Motor indices detected: {ids_per_baudrate}")
print()

possible_baudrates = list(ids_per_baudrate.keys())
possible_ids = list({idx for sublist in ids_per_baudrate.values() for idx in sublist})
untaken_ids = list(set(range(MAX_ID_RANGE)) - set(possible_ids) - set(self.motor_indices))

# Connect successively one motor to the chain and write a unique random index for each
for i in range(len(self.motors)):
self.disconnect()
input(
"1. Unplug the power cord\n"
"2. Plug/unplug minimal number of cables to only have the first "
f"{i+1} motor(s) ({self.motor_names[:i+1]}) connected.\n"
"3. Re-plug the power cord\n"
"Press Enter to continue..."
)
print()
self.reconnect()

if i > 0:
try:
self.read_with_motor_ids(self.motor_models, untaken_ids[:i], "ID")
except ConnectionError:
print(f"Failed to read from {untaken_ids[:i+1]}. Make sure the power cord is plugged in.")
input("Press Enter to continue...")
print()
self.reconnect()

print("Scanning possible baudrates and motor indices")
motor_found = False
for baudrate in possible_baudrates:
self.set_bus_baudrate(baudrate)
present_ids = self.find_motor_indices(possible_ids)
if len(present_ids) == 1:
present_idx = present_ids[0]
print(f"Detected motor with index {present_idx}")

if baudrate != BAUDRATE:
print(f"Setting its baudrate to {BAUDRATE}")
baudrate_idx = list(SCS_SERIES_BAUDRATE_TABLE.values()).index(BAUDRATE)

# The write can fail, so we allow retries
for _ in range(NUM_WRITE_RETRY):
self.write_with_motor_ids(
self.motor_models, present_idx, "Baud_Rate", baudrate_idx
)
time.sleep(0.5)
self.set_bus_baudrate(BAUDRATE)
try:
present_baudrate_idx = self.read_with_motor_ids(
self.motor_models, present_idx, "Baud_Rate"
)
except ConnectionError:
print("Failed to write baudrate. Retrying.")
self.set_bus_baudrate(baudrate)
continue
break
else:
raise

if present_baudrate_idx != baudrate_idx:
raise OSError("Failed to write baudrate.")

print(f"Setting its index to a temporary untaken index ({untaken_ids[i]})")
self.write_with_motor_ids(self.motor_models, present_idx, "ID", untaken_ids[i])

present_idx = self.read_with_motor_ids(self.motor_models, untaken_ids[i], "ID")
if present_idx != untaken_ids[i]:
raise OSError("Failed to write index.")

motor_found = True
break
elif len(present_ids) > 1:
raise OSError(f"More than one motor detected ({present_ids}), but only one was expected.")

if not motor_found:
raise OSError(
"No motor found, but one new motor expected. Verify power cord is plugged in and retry."
)
print()

print(f"Setting expected motor indices: {self.motor_indices}")
self.set_bus_baudrate(BAUDRATE)
self.write_with_motor_ids(
self.motor_models, untaken_ids[: len(self.motors)], "ID", self.motor_indices
)
print()

if (self.read("ID") != self.motor_indices).any():
raise OSError("Failed to write motors indices.")

print("Configuration is done!")

def find_motor_indices(self, possible_ids=None):
if possible_ids is None:
possible_ids = range(MAX_ID_RANGE)
Expand Down Expand Up @@ -941,8 +800,3 @@ def disconnect(self):
def __del__(self):
if getattr(self, "is_connected", False):
self.disconnect()


if __name__ == "__main__":
# Helper to find the usb port associated to all your FeetechMotorsBus.
find_port()
Loading

0 comments on commit 9c6ebce

Please sign in to comment.