From aa22946a7fb3eb87514062e5896fef847473f3fa Mon Sep 17 00:00:00 2001 From: jess-moss Date: Tue, 17 Sep 2024 16:31:08 -0500 Subject: [PATCH] Removed configuration and port finding code from classes no longer necessary. --- .../common/robot_devices/motors/dynamixel.py | 146 ------------------ .../common/robot_devices/motors/feetech.py | 146 ------------------ lerobot/scripts/configure_motor.py | 2 +- lerobot/scripts/find_motors_bus_port.py | 8 +- .../templates/visualize_dataset_template.html | 2 +- 5 files changed, 6 insertions(+), 298 deletions(-) diff --git a/lerobot/common/robot_devices/motors/dynamixel.py b/lerobot/common/robot_devices/motors/dynamixel.py index e3f64685c..fb4663250 100644 --- a/lerobot/common/robot_devices/motors/dynamixel.py +++ b/lerobot/common/robot_devices/motors/dynamixel.py @@ -4,7 +4,6 @@ import time import traceback from copy import deepcopy -from pathlib import Path import numpy as np import tqdm @@ -235,35 +234,6 @@ def assert_same_address(model_ctrl_table, motor_models, data_name): ) -def find_available_ports(): - ports = [] - for path in Path("/dev").glob("tty*"): - ports.append(str(path)) - return ports - - -def find_port(): - print("Finding all available ports for the DynamixelMotorsBus.") - ports_before = find_available_ports() - print(ports_before) - - print("Remove the usb cable from your DynamixelMotorsBus and press Enter when done.") - input() - - time.sleep(0.5) - ports_after = find_available_ports() - ports_diff = list(set(ports_before) - set(ports_after)) - - if len(ports_diff) == 1: - port = ports_diff[0] - print(f"The port of this DynamixelMotorsBus is '{port}'") - print("Reconnect the usb cable.") - elif len(ports_diff) == 0: - raise OSError(f"Could not detect the port. No difference was found ({ports_diff}).") - else: - raise OSError(f"Could not detect the port. More than one port was found ({ports_diff}).") - - class TorqueMode(enum.Enum): ENABLED = 1 DISABLED = 0 @@ -377,11 +347,6 @@ def connect(self): self.port_handler.setPacketTimeoutMillis(TIMEOUT_MS) - if not self.are_motors_configured(): - raise OSError( - "Motors are not configured. Please use configure_motors.py to configure motors before continuing." - ) - def reconnect(self): self.port_handler = PortHandler(self.port) self.packet_handler = PacketHandler(PROTOCOL_VERSION) @@ -398,112 +363,6 @@ def are_motors_configured(self): print(e) return False - def configure_motors(self): - # TODO(rcadene): This script assumes motors follow the X_SERIES baudrates - # TODO(rcadene): Refactor this function with intermediate high-level functions - - print("Scanning all baudrates and motor indices") - all_baudrates = set(X_SERIES_BAUDRATE_TABLE.values()) - ids_per_baudrate = {} - for baudrate in all_baudrates: - self.set_bus_baudrate(baudrate) - present_ids = self.find_motor_indices() - if len(present_ids) > 0: - ids_per_baudrate[baudrate] = present_ids - print(f"Motor indices detected: {ids_per_baudrate}") - print() - - possible_baudrates = list(ids_per_baudrate.keys()) - possible_ids = list({idx for sublist in ids_per_baudrate.values() for idx in sublist}) - untaken_ids = list(set(range(MAX_ID_RANGE)) - set(possible_ids) - set(self.motor_indices)) - - # Connect successively one motor to the chain and write a unique random index for each - for i in range(len(self.motors)): - self.disconnect() - input( - "1. Unplug the power cord\n" - "2. Plug/unplug minimal number of cables to only have the first " - f"{i+1} motor(s) ({self.motor_names[:i+1]}) connected.\n" - "3. Re-plug the power cord\n" - "Press Enter to continue..." - ) - print() - self.reconnect() - - if i > 0: - try: - self.read_with_motor_ids(self.motor_models, untaken_ids[:i], "ID") - except ConnectionError: - print(f"Failed to read from {untaken_ids[:i+1]}. Make sure the power cord is plugged in.") - input("Press Enter to continue...") - print() - self.reconnect() - - print("Scanning possible baudrates and motor indices") - motor_found = False - for baudrate in possible_baudrates: - self.set_bus_baudrate(baudrate) - present_ids = self.find_motor_indices(possible_ids) - if len(present_ids) == 1: - present_idx = present_ids[0] - print(f"Detected motor with index {present_idx}") - - if baudrate != BAUDRATE: - print(f"Setting its baudrate to {BAUDRATE}") - baudrate_idx = list(X_SERIES_BAUDRATE_TABLE.values()).index(BAUDRATE) - - # The write can fail, so we allow retries - for _ in range(NUM_WRITE_RETRY): - self.write_with_motor_ids( - self.motor_models, present_idx, "Baud_Rate", baudrate_idx - ) - time.sleep(0.5) - self.set_bus_baudrate(BAUDRATE) - try: - present_baudrate_idx = self.read_with_motor_ids( - self.motor_models, present_idx, "Baud_Rate" - ) - except ConnectionError: - print("Failed to write baudrate. Retrying.") - self.set_bus_baudrate(baudrate) - continue - break - else: - raise - - if present_baudrate_idx != baudrate_idx: - raise OSError("Failed to write baudrate.") - - print(f"Setting its index to a temporary untaken index ({untaken_ids[i]})") - self.write_with_motor_ids(self.motor_models, present_idx, "ID", untaken_ids[i]) - - present_idx = self.read_with_motor_ids(self.motor_models, untaken_ids[i], "ID") - if present_idx != untaken_ids[i]: - raise OSError("Failed to write index.") - - motor_found = True - break - elif len(present_ids) > 1: - raise OSError(f"More than one motor detected ({present_ids}), but only one was expected.") - - if not motor_found: - raise OSError( - "No motor found, but one new motor expected. Verify power cord is plugged in and retry." - ) - print() - - print(f"Setting expected motor indices: {self.motor_indices}") - self.set_bus_baudrate(BAUDRATE) - self.write_with_motor_ids( - self.motor_models, untaken_ids[: len(self.motors)], "ID", self.motor_indices - ) - print() - - if (self.read("ID") != self.motor_indices).any(): - raise OSError("Failed to write motors indices.") - - print("Configuration is done!") - def find_motor_indices(self, possible_ids=None): if possible_ids is None: possible_ids = range(MAX_ID_RANGE) @@ -968,8 +827,3 @@ def disconnect(self): def __del__(self): if getattr(self, "is_connected", False): self.disconnect() - - -if __name__ == "__main__": - # Helper to find the usb port associated to all your DynamixelMotorsBus. - find_port() diff --git a/lerobot/common/robot_devices/motors/feetech.py b/lerobot/common/robot_devices/motors/feetech.py index 53c92950f..0043ec8a0 100644 --- a/lerobot/common/robot_devices/motors/feetech.py +++ b/lerobot/common/robot_devices/motors/feetech.py @@ -4,7 +4,6 @@ import time import traceback from copy import deepcopy -from pathlib import Path import numpy as np import tqdm @@ -208,35 +207,6 @@ def assert_same_address(model_ctrl_table, motor_models, data_name): ) -def find_available_ports(): - ports = [] - for path in Path("/dev").glob("tty*"): - ports.append(str(path)) - return ports - - -def find_port(): - print("Finding all available ports for the FeetechMotorsBus.") - ports_before = find_available_ports() - print(ports_before) - - print("Remove the usb cable from your FeetechMotorsBus and press Enter when done.") - input() - - time.sleep(0.5) - ports_after = find_available_ports() - ports_diff = list(set(ports_before) - set(ports_after)) - - if len(ports_diff) == 1: - port = ports_diff[0] - print(f"The port of this FeetechMotorsBus is '{port}'") - print("Reconnect the usb cable.") - elif len(ports_diff) == 0: - raise OSError(f"Could not detect the port. No difference was found ({ports_diff}).") - else: - raise OSError(f"Could not detect the port. More than one port was found ({ports_diff}).") - - class TorqueMode(enum.Enum): ENABLED = 1 DISABLED = 0 @@ -350,11 +320,6 @@ def connect(self): self.port_handler.setPacketTimeoutMillis(TIMEOUT_MS) - if not self.are_motors_configured(): - raise OSError( - "Motors are not configured. Please use configure_motors.py to configure motors before continuing." - ) - def reconnect(self): self.port_handler = PortHandler(self.port) self.packet_handler = PacketHandler(PROTOCOL_VERSION) @@ -371,112 +336,6 @@ def are_motors_configured(self): print(e) return False - def configure_motors(self): - # TODO(rcadene): This script assumes motors follow the X_SERIES baudrates - # TODO(rcadene): Refactor this function with intermediate high-level functions - - print("Scanning all baudrates and motor indices") - all_baudrates = set(SCS_SERIES_BAUDRATE_TABLE.values()) - ids_per_baudrate = {} - for baudrate in all_baudrates: - self.set_bus_baudrate(baudrate) - present_ids = self.find_motor_indices() - if len(present_ids) > 0: - ids_per_baudrate[baudrate] = present_ids - print(f"Motor indices detected: {ids_per_baudrate}") - print() - - possible_baudrates = list(ids_per_baudrate.keys()) - possible_ids = list({idx for sublist in ids_per_baudrate.values() for idx in sublist}) - untaken_ids = list(set(range(MAX_ID_RANGE)) - set(possible_ids) - set(self.motor_indices)) - - # Connect successively one motor to the chain and write a unique random index for each - for i in range(len(self.motors)): - self.disconnect() - input( - "1. Unplug the power cord\n" - "2. Plug/unplug minimal number of cables to only have the first " - f"{i+1} motor(s) ({self.motor_names[:i+1]}) connected.\n" - "3. Re-plug the power cord\n" - "Press Enter to continue..." - ) - print() - self.reconnect() - - if i > 0: - try: - self.read_with_motor_ids(self.motor_models, untaken_ids[:i], "ID") - except ConnectionError: - print(f"Failed to read from {untaken_ids[:i+1]}. Make sure the power cord is plugged in.") - input("Press Enter to continue...") - print() - self.reconnect() - - print("Scanning possible baudrates and motor indices") - motor_found = False - for baudrate in possible_baudrates: - self.set_bus_baudrate(baudrate) - present_ids = self.find_motor_indices(possible_ids) - if len(present_ids) == 1: - present_idx = present_ids[0] - print(f"Detected motor with index {present_idx}") - - if baudrate != BAUDRATE: - print(f"Setting its baudrate to {BAUDRATE}") - baudrate_idx = list(SCS_SERIES_BAUDRATE_TABLE.values()).index(BAUDRATE) - - # The write can fail, so we allow retries - for _ in range(NUM_WRITE_RETRY): - self.write_with_motor_ids( - self.motor_models, present_idx, "Baud_Rate", baudrate_idx - ) - time.sleep(0.5) - self.set_bus_baudrate(BAUDRATE) - try: - present_baudrate_idx = self.read_with_motor_ids( - self.motor_models, present_idx, "Baud_Rate" - ) - except ConnectionError: - print("Failed to write baudrate. Retrying.") - self.set_bus_baudrate(baudrate) - continue - break - else: - raise - - if present_baudrate_idx != baudrate_idx: - raise OSError("Failed to write baudrate.") - - print(f"Setting its index to a temporary untaken index ({untaken_ids[i]})") - self.write_with_motor_ids(self.motor_models, present_idx, "ID", untaken_ids[i]) - - present_idx = self.read_with_motor_ids(self.motor_models, untaken_ids[i], "ID") - if present_idx != untaken_ids[i]: - raise OSError("Failed to write index.") - - motor_found = True - break - elif len(present_ids) > 1: - raise OSError(f"More than one motor detected ({present_ids}), but only one was expected.") - - if not motor_found: - raise OSError( - "No motor found, but one new motor expected. Verify power cord is plugged in and retry." - ) - print() - - print(f"Setting expected motor indices: {self.motor_indices}") - self.set_bus_baudrate(BAUDRATE) - self.write_with_motor_ids( - self.motor_models, untaken_ids[: len(self.motors)], "ID", self.motor_indices - ) - print() - - if (self.read("ID") != self.motor_indices).any(): - raise OSError("Failed to write motors indices.") - - print("Configuration is done!") - def find_motor_indices(self, possible_ids=None): if possible_ids is None: possible_ids = range(MAX_ID_RANGE) @@ -941,8 +800,3 @@ def disconnect(self): def __del__(self): if getattr(self, "is_connected", False): self.disconnect() - - -if __name__ == "__main__": - # Helper to find the usb port associated to all your FeetechMotorsBus. - find_port() diff --git a/lerobot/scripts/configure_motor.py b/lerobot/scripts/configure_motor.py index 827849b24..ec50e0ad5 100644 --- a/lerobot/scripts/configure_motor.py +++ b/lerobot/scripts/configure_motor.py @@ -55,7 +55,7 @@ def configure_motor(brand, model, motor_idx_des, baudrate_des): try: motor_bus.connect() print(f"Connected on port {motor_bus.port}") - except Exception as e: + except OSError as e: print(f"Error occurred when connecting to the motor bus: {e}") return diff --git a/lerobot/scripts/find_motors_bus_port.py b/lerobot/scripts/find_motors_bus_port.py index 3e547b25f..51ef6d41c 100644 --- a/lerobot/scripts/find_motors_bus_port.py +++ b/lerobot/scripts/find_motors_bus_port.py @@ -10,11 +10,11 @@ def find_available_ports(): def find_port(): - print("Finding all available ports for the DynamixelMotorsBus.") + print("Finding all available ports for the MotorsBus.") ports_before = find_available_ports() print(ports_before) - print("Remove the usb cable from your DynamixelMotorsBus and press Enter when done.") + print("Remove the usb cable from your MotorsBus and press Enter when done.") input() time.sleep(0.5) @@ -23,7 +23,7 @@ def find_port(): if len(ports_diff) == 1: port = ports_diff[0] - print(f"The port of this DynamixelMotorsBus is '{port}'") + print(f"The port of this MotorsBus is '{port}'") print("Reconnect the usb cable.") elif len(ports_diff) == 0: raise OSError(f"Could not detect the port. No difference was found ({ports_diff}).") @@ -32,5 +32,5 @@ def find_port(): if __name__ == "__main__": - # Helper to find the usb port associated to all your DynamixelMotorsBus. + # Helper to find the usb port associated to all your MotorsBus. find_port() diff --git a/lerobot/templates/visualize_dataset_template.html b/lerobot/templates/visualize_dataset_template.html index 0a49fd5b6..3a845015d 100644 --- a/lerobot/templates/visualize_dataset_template.html +++ b/lerobot/templates/visualize_dataset_template.html @@ -241,7 +241,7 @@

if(!canPlayVideos){ this.videoCodecError = true; } - + // process CSV data this.videos = document.querySelectorAll('video'); this.video = this.videos[0];