From 9465c049c27be42079e9575ea675b501dff0506b Mon Sep 17 00:00:00 2001 From: Dozgulbas Date: Mon, 25 Mar 2024 23:59:55 -0500 Subject: [PATCH] Code clean up --- src/ur_driver/ur_dashboard.py | 290 +++++++++---- src/ur_driver/ur_driver.py | 750 ++++++++++++++++++++++++++-------- 2 files changed, 781 insertions(+), 259 deletions(-) diff --git a/src/ur_driver/ur_dashboard.py b/src/ur_driver/ur_dashboard.py index 6e92c5d..61aa2af 100644 --- a/src/ur_driver/ur_dashboard.py +++ b/src/ur_driver/ur_dashboard.py @@ -1,19 +1,22 @@ +"""Interface for UR Dashboard""" + import socket from time import sleep -from paramiko import SSHClient, AutoAddPolicy, SSHException +from paramiko import AutoAddPolicy, SSHClient, SSHException from scp import SCPClient, SCPException -class UR_DASHBOARD(): + +class UR_DASHBOARD: """ - This is a python interface to communicate the UR Dashboard server. - UR can be controlled from remote by sending simple commands to the GUI over a TCP/IP socket. - The server is running on port 29999 on the robots IP address. + This is a python interface to communicate the UR Dashboard server. + UR can be controlled from remote by sending simple commands to the GUI over a TCP/IP socket. + The server is running on port 29999 on the robots IP address. Each command should be terminated by a `\n` also called a newline. """ - def __init__(self, hostname:str = "146.137.240.38", PORT: int = 29999) -> None: - """Constructor for the UR class. + def __init__(self, hostname: str = "146.137.240.38", PORT: int = 29999) -> None: + """Constructor for the UR dashboard class. :param hostname: Hostname or ip. :param port: Port. """ @@ -33,33 +36,37 @@ def connect(self) -> None: """Create a socket""" try: self.connection = socket.socket(socket.AF_INET, socket.SOCK_STREAM) - self.connection.settimeout(5) # Socket will wait 5 seconds till it recieves the response - self.connection.connect((self.hostname,self.port)) + self.connection.settimeout( + 5 + ) # Socket will wait 5 seconds till it recieves the response + self.connection.connect((self.hostname, self.port)) except socket.error as err: print("UR dashboard could not establish connection") print(err) self.connection_error = True - def disconnect(self) -> None: """Close the socket""" self.connection.close() - def send_command(self, command, response_delay:float = 0.1) -> str: - - # print(">> " + command) - + def send_command(self, command: str = None, response_delay: float = 0.1) -> str: + """Constructs a command to send over the Dashboard socket + Args + command (str): Robot command. + response_delay (float): Port. + Return (str): Response message + """ try: if not self.connection: self.connect() - self.connection.sendall((command.encode("ascii") + b"\n")) - - sleep(response_delay) # Wait for response delay + self.connection.sendall((command.encode("ascii") + b"\n")) + + sleep(response_delay) # Wait for response delay response = self.connection.recv(4096).decode("utf-8") - - if response.find('Connected: Universal Robots Dashboard Server') != -1: + + if response.find("Connected: Universal Robots Dashboard Server") != -1: print("Connected: Universal Robots Dashboard Server") response = response[45:] @@ -78,41 +85,51 @@ def get_overall_robot_status(self) -> None: self.remote_control_status = self.is_in_remote_control() def initialize(self) -> None: + """Initializes the robot for Remote operation mode""" if self.connection_error: return - + self.get_overall_robot_status() - if self.safety_status == 'PROTECTIVE_STOP': + if self.safety_status == "PROTECTIVE_STOP": print("Unlocking protective stop") self.unlock_protective_stop() - elif "NORMAL" not in self.safety_status: #self.safety_status != "ROBOT_EMERGENCY_STOP" or self.safety_status != "SYSTEM_EMERGENCY_STOP": + elif ( + "NORMAL" not in self.safety_status + ): # self.safety_status != "ROBOT_EMERGENCY_STOP" or self.safety_status != "SYSTEM_EMERGENCY_STOP": print("Restarting safety") self.close_safety_popup() - self.restart_safety() + self.restart_safety() if self.operational_mode == "MANUAL": print("Operation mode is currently set to MANUAL, switching to AUTOMATIC") self.set_operational_mode("automatic") - if self.remote_control_status == False: + if self.remote_control_status is False: print("Robot is not in remote control") - - if self.robot_mode == 'RUNNING' and "NORMAL" in self.safety_status: - print('Robot is initialized') + + if self.robot_mode == "RUNNING" and "NORMAL" in self.safety_status: + print("Robot is initialized") return - - elif self.robot_mode == "POWER_OFF" or self.robot_mode == "BOOTING" or self.robot_mode == "POWER_ON" or self.robot_mode == "IDLE": + + elif ( + self.robot_mode == "POWER_OFF" + or self.robot_mode == "BOOTING" + or self.robot_mode == "POWER_ON" + or self.robot_mode == "IDLE" + ): print("Powering on the robot and releasing brakes") self.brake_release() return self.initialize() def get_robot_mode(self) -> str: - """Return the robot mode""" + """Return the robot mode + Return (str): Robot mode + """ output = self.send_command("robotmode") - output = output.split(' ') + output = output.split(" ") try: if "\n" in output[1]: return output[1].split("\n")[0] @@ -121,49 +138,84 @@ def get_robot_mode(self) -> str: except IndexError: print("Depricated output!") return output - + def quit(self) -> str: - '''Closes connection to robot''' - return self.send_command('quit') + """Closes connection to robot + + Return (str): Socket response + """ + return self.send_command("quit") def shutdown(self) -> str: - '''Shuts down and turns off robot and controller''' - return self.send_command('shutdown') + """Shuts down and turns off robot and controller + + Return (str): Socket response + """ + return self.send_command("shutdown") def power_on(self) -> str: - '''Powers on the robot arm''' - return self.send_command('power on') + """Powers on the robot arm + + Return (str): Socket response + """ + return self.send_command("power on") def power_off(self) -> str: - '''Powers off the robot arm''' - return self.send_command('power off') + """Powers off the robot arm + + Return (str): Socket response + """ + return self.send_command("power off") def brake_release(self) -> str: - '''Releases the brakes''' - output = self.send_command('brake release') + """Releases the brakes + + Return (str): Socket response + """ + output = self.send_command("brake release") sleep(20) return output def unlock_protective_stop(self) -> str: - return self.send_command('unlock protective stop') + """Unlocks the protective stop + + Return (str): Socket response + """ + return self.send_command("unlock protective stop") def close_safety_popup(self) -> str: - return self.send_command('close safety popup') + """Closes the safety popup messages + + Return (str): Socket response + """ + return self.send_command("close safety popup") def is_in_remote_control(self) -> str: - return self.send_command('is in remote control') + """Checks if robot is in remote control mode + + Return (str): True / False + """ + return self.send_command("is in remote control") def restart_safety(self) -> str: - output = self.send_command('restart safety') + """Restarts the safety + + Return (str): Socket response + """ + output = self.send_command("restart safety") sleep(10) self.disconnect() self.connect() output2 = self.brake_release() - return output - + return output + output2 + def get_safety_status(self) -> str: - output = self.send_command('safetystatus') - output = output.split(' ') + """Returns safety status + + Return (str): Safety status + """ + output = self.send_command("safetystatus") + output = output.split(" ") # print(output) try: if "\n" in output[1]: @@ -173,23 +225,61 @@ def get_safety_status(self) -> str: except IndexError: print("Depricated output!") return output - + def get_operational_mode(self) -> str: - return self.send_command('get operational mode') + """Gets operational mode + + Return (str): Operational mode (Manual/Auto) + """ + return self.send_command("get operational mode") def set_operational_mode(self, mode) -> str: - return self.send_command('set operational mode ' + mode) + """Sets the operational mode + + Args: mode(str) Either manual or auto + + Return (str): Socket response + """ + return self.send_command("set operational mode " + mode) def clear_operational_mode(self) -> str: - return self.send_command('clear operational mode') + """Clears the operational mode when the control is stuck in one mode + + Return (str): Socket response + """ + return self.send_command("clear operational mode") def popup(self, message) -> str: - return self.send_command('popup ' + message) + """Display a custom popup message + + Args: message (str) Costom message + + Return (str): Socket response + """ + return self.send_command("popup " + message) def close_popup(self) -> str: - return self.send_command('close popup') - - def transfer_program(self, local_path:str = None, ur_path:str = "/programs/", user_name:str = "root", user_password:str = "easybot") -> None: + """Closes the popup messages + + Return (str): Socket response + """ + return self.send_command("close popup") + + def transfer_program( + self, + local_path: str = None, + remote_path: str = "/programs/", + user_name: str = "root", + user_password: str = "easybot", + ) -> None: + """Trasnfers a URP program from local path to Robot computer + + Args + local_path (str): Local path to URP program + remote_path (str): Remote path on UR computer + user_name (str): User name for UR computer + user_password(str): User password for UR compurter + """ if not local_path: print("Local file was not provided!") return @@ -197,46 +287,87 @@ def transfer_program(self, local_path:str = None, ur_path:str = "/programs/", us ssh_client = SSHClient() ssh_client.load_system_host_keys() ssh_client.set_missing_host_key_policy(AutoAddPolicy()) - ssh_client.connect(hostname = self.hostname, username = user_name, password = user_password, disabled_algorithms={'pubkeys': ['rsa-sha2-256', 'rsa-sha2-512']}) + ssh_client.connect( + hostname=self.hostname, + username=user_name, + password=user_password, + disabled_algorithms={"pubkeys": ["rsa-sha2-256", "rsa-sha2-512"]}, + ) with SCPClient(ssh_client.get_transport()) as scp: - scp.put(local_path, ur_path) + scp.put(local_path, remote_path) except SSHException as scp_err: - print("SSH error: " + scp_err) + print("SSH error: " + scp_err) except SCPException as scp_err: print("SCP error: " + scp_err) else: - print("UR program "+ local_path + " is transferred to UR onboard " + ur_path) - + print( + "UR program " + + local_path + + " is transferred to UR onboard " + + remote_path + ) + finally: scp.close() ssh_client.close() - def load_program(self, program_path:str) -> str: + def load_program(self, program_path: str) -> str: + """Load a URP program on Polyscope + + Args: program_path (str) Path to the URP program on UR computer + + Return (str): Socket response + """ return self.send_command("load " + program_path) - + def get_program_state(self) -> str: - return self.send_command('programState') - + """Gets the currently loaded program state + + Return (str): Program state (Running/Stopped/Error) + """ + return self.send_command("programState") + def get_loaded_program(self) -> str: - return self.send_command('get loaded program') - + """Gets the currently loaded program name + + Return (str): Program name + """ + return self.send_command("get loaded program") + def get_program_run_status(self) -> str: - return self.send_command('running') - + """Gets program run status + + Return (str): Status + """ + return self.send_command("running") + def run_program(self) -> str: - return self.send_command('play') - + """Runs the loaded program + + Return (str): Socket response + """ + return self.send_command("play") + def pause_program(self) -> str: - return self.send_command('pause') - + """Pauses the currently running program + + Return (str): Socket response + """ + return self.send_command("pause") + def stop_program(self) -> str: - return self.send_command('stop') + """Stops the currently running program + + Return (str): Socket response + """ + return self.send_command("stop") if __name__ == "__main__": + """Tests""" robot = UR_DASHBOARD("164.54.116.129") # robot.get_loaded_program() # robot.get_program_state() @@ -261,10 +392,9 @@ def stop_program(self) -> str: # robot.transfer_program("/home/rpl/test.urp", "/programs/katerina.urp") # robot.load_program("/programs/katerina.urp") # robot.run_program() - - + """ - Bug: Output error message breaks the client state check, causing the client to retry connection + Bug: Output error message breaks the client state check, causing the client to retry connection [ur5_client-1] [INFO] [1685573134.289546521] [ur5_client.UR5_Client_Node]: {'program_name': 'chemspeed2tecan'} [ur5_client-1] [INFO] [1685573134.290071438] [ur5_client.UR5_Client_Node]: None @@ -278,4 +408,4 @@ def stop_program(self) -> str: [ur5_client-1] [ERROR] [1685573134.745633959] [ur5_client.UR5_Client_Node]: list index out of range [ur5_client-1] [ERROR] [1685573134.746134932] [ur5_client.UR5_Client_Node]: State: ERROR [ur5_client-1] [ERROR] [1685573134.746477735] [ur5_client.UR5_Client_Node]: Robot_Mode: RUNNING Safety_Status: RUNNING - """ \ No newline at end of file + """ diff --git a/src/ur_driver/ur_driver.py b/src/ur_driver/ur_driver.py index 225a018..7b019cc 100644 --- a/src/ur_driver/ur_driver.py +++ b/src/ur_driver/ur_driver.py @@ -1,24 +1,32 @@ #!/usr/bin/env python3 +"""Interface for UR Driver""" -import threading -import socket - -from multiprocessing.connection import wait -from time import sleep +import socket from copy import deepcopy -import json -from math import radians, degrees - -from urx import Robot, RobotException -from ur_driver.ur_dashboard import UR_DASHBOARD -from ur_driver.ur_tools import * +from math import radians +from time import sleep -class Connection(): - """Connection to the UR robot to be shared within UR driver """ - def __init__(self, hostname:str = "146.137.240.38") -> None: +from urx import Robot +from ur_driver.ur_dashboard import UR_DASHBOARD +from ur_driver.ur_tools.gripper_controller import ( + FingerGripperController, +) +from ur_driver.ur_tools.ot_pipette_controller import OTPipetteController +from ur_driver.ur_tools.screwdriver_controller import ScrewdriverController +from ur_driver.ur_tools.tricontinent_pipette_controller import ( + TricontinentPipetteController, +) +from ur_driver.ur_tools.wm_tool_changer_controller import WMToolChangerController + + +class Connection: + """Connection to the UR robot to be shared within UR driver""" + + def __init__(self, hostname: str = "146.137.240.38") -> None: + """Connection class that creates a connection with the robot using URx Library""" self.hostname = hostname - + self.connection = None self.connect_ur() @@ -32,11 +40,11 @@ def connect_ur(self): self.connection = Robot(self.hostname) except socket.error: - print("Trying robot connection ...") + print("Trying robot connection {}...".format(i)) sleep(10) else: - print('Successful ur connection') + print("Successful ur connection") break def disconnect_ur(self): @@ -46,13 +54,15 @@ def disconnect_ur(self): self.connection.close() print("Robot connection is closed.") -class UR(): + +class UR: + """ + This is the primary class for UR robots. + It integrates various interfaces to achieve comprehensive control, encompassing robot initialization via the UR dashboard, + robot motion using URx, and the management of robot end-effectors such as grippers, screwdrivers, electronic pipettes, and cameras." """ - This is the primary class for UR robots. - It integrates various interfaces to achieve comprehensive control, encompassing robot initialization via the UR dashboard, - robot motion using URx, and the management of robot end-effectors such as grippers, screwdrivers, electronic pipettes, and cameras." - """ - def __init__(self, hostname:str = None, PORT: int = 29999): + + def __init__(self, hostname: str = None, PORT: int = 29999): """Constructor for the UR class. :param hostname: Hostname or ip. :param port: Port. @@ -60,30 +70,36 @@ def __init__(self, hostname:str = None, PORT: int = 29999): if not hostname: raise TypeError("Hostname cannot be None Type!") - + # super().__init__(hostname=hostname, PORT=PORT) self.hostname = hostname self.PORT = PORT - self.ur_dashboard = UR_DASHBOARD(hostname = self.hostname, PORT = self.PORT) - self.ur = Connection(hostname = self.hostname) + self.ur_dashboard = UR_DASHBOARD(hostname=self.hostname, PORT=self.PORT) + self.ur = Connection(hostname=self.hostname) self.ur_connection = self.ur.connection self.ur_connection.set_tcp((0, 0, 0, 0, 0, 0)) self.acceleration = 0.5 self.velocity = 0.5 - self.speed_ms = 0.750 - self.speed_rads = 0.750 - self.accel_mss = 1.200 + self.speed_ms = 0.750 + self.speed_rads = 0.750 + self.accel_mss = 1.200 self.accel_radss = 1.200 self.blend_radius_m = 0.001 - self.ref_frame = [0,0,0,0,0,0] + self.ref_frame = [0, 0, 0, 0, 0, 0] self.robot_current_joint_angles = None self.get_movement_state() - #TODO: get the information of what is the current tool attached to UR. Maybe keep the UR unattached after the tools were used? Run a senity check at the beginning to findout if a tool is connected - - def get_movement_state(self): + # TODO: get the information of what is the current tool attached to UR. Maybe keep the UR unattached after the tools were used? Run a senity check at the beginning to findout if a tool is connected + + def get_movement_state(self) -> str: + """Gets robot movement status by checking robot joint values. + Return (str) READY if robot is not moving + BUSY if robot is moving + """ current_location = self.ur_connection.getj() - current_location = [ '%.2f' % value for value in current_location] #rounding to 3 digits + current_location = [ + "%.2f" % value for value in current_location + ] # rounding to 3 digits # print(current_location) if self.robot_current_joint_angles == current_location: movement_state = "READY" @@ -94,50 +110,117 @@ def get_movement_state(self): return movement_state - def home(self, home_location = None): - """ - Description: Moves the robot to the home location. + def home(self, home_location: list = None) -> None: + """Moves the robot to the home location. + + Args: home_location (list) A 6 joint value location """ + print("Homing the robot...") if home_location: home_loc = home_location else: - home_loc = [-1.355567757283346, -2.5413090191283167, 1.8447726408587855, -0.891581193809845, -1.5595606009112757, 3.3403327465057373] - self.ur_connection.movej(home_loc,2,2) - # sleep(3.5) - + home_loc = [ + -1.355567757283346, + -2.5413090191283167, + 1.8447726408587855, + -0.891581193809845, + -1.5595606009112757, + 3.3403327465057373, + ] + self.ur_connection.movej(home_loc, 2, 2) print("Robot homed") - - def pick_tool(self, home, tool_loc, docking_axis = "y", payload = 0.12, tool_name:str = None): - """ - Picks up a tool using the given tool location + + def pick_tool( + self, + home: list = None, + tool_loc: list = None, + docking_axis: str = "y", + payload: float = 0.12, + tool_name: str = None, + ) -> None: + """Picks up a tool using the given tool location + Args + home (list): Home location + tool_loc (list): Tool location + docking_axis (str): Docking axis (x/y/z). Default: Y AXIS + payload (float): Weight of the end effector + tool_name (str): Name of the tool to indentify system variables """ self.ur_connection.set_payload(payload) - wingman_tool = WMToolChangerController(tool_location = tool_loc, docking_axis = docking_axis, ur = self.ur_connection, tool = tool_name) + wingman_tool = WMToolChangerController( + tool_location=tool_loc, + docking_axis=docking_axis, + ur=self.ur_connection, + tool=tool_name, + ) self.home(home) wingman_tool.pick_tool() - self.home(home) + self.home(home) + + def place_tool( + self, + home: list = None, + tool_loc: list = None, + docking_axis: str = "y", + tool_name: str = None, + ) -> None: + """Places a tool back to tool docking location + Args + home (list): Home location + tool_loc (list): Tool location + docking_axis (str): Docking axis (x/y/z). Default: Y AXIS + tool_name (str): Name of the tool to indentify system variables - def place_tool(self, home, tool_loc, docking_axis = "y", tool_name:str = None): - """ - Picks up a tool using the given tool location """ - wingman_tool = WMToolChangerController(tool_location = tool_loc, docking_axis = docking_axis, ur = self.ur_connection, tool = tool_name) + wingman_tool = WMToolChangerController( + tool_location=tool_loc, + docking_axis=docking_axis, + ur=self.ur_connection, + tool=tool_name, + ) self.home(home) wingman_tool.place_tool() - self.home(home) + self.home(home) + + def gripper_transfer( + self, + home: list = None, + source: list = None, + target: list = None, + source_approach_axis: str = None, + target_approach_axis: str = None, + source_approach_distance: float = None, + target_approach_distance: float = None, + gripper_open: int = None, + gripper_close: int = None, + ) -> None: + """Make a transfer using the finger gripper. This function uses linear motions to perform the pick and place movements. + + Args + home (list): Home location + source (list): Source location + target(list): Target location + source_approach_axis (str): Source approach axis (X/Y/Z) + target_approach_axis (str): Target approach axis (X/Y/Z) + source_approach_distance (float): Source approach distance. Unit meters. + target_approach_distance(float): Target approach distance. Unit meters. + gripper_open (int): Gripper max open value (0-255) + gripper_close (int): Gripper min close value (0-255) + + """ - def gripper_transfer(self, home:list = None, source: list = None, target: list = None, source_approach_axis:str = None, target_approach_axis:str = None, source_approach_distance: float = None, target_approach_distance: float = None, gripper_open:int = None, gripper_close:int = None) -> None: - ''' - Make a transfer using the finger gripper. This function uses linear motions to perform the pick and place movements. - ''' if not source or not target: - raise Exception("Please provide both the source and target loactions to make a transfer") - + raise Exception( + "Please provide both the source and target loactions to make a transfer" + ) + self.home(home) - + try: - gripper_controller = FingerGripperController(hostname = self.hostname, ur = self.ur_connection) + gripper_controller = FingerGripperController( + hostname=self.hostname, ur=self.ur_connection + ) gripper_controller.connect_gripper() if gripper_open: @@ -145,8 +228,16 @@ def gripper_transfer(self, home:list = None, source: list = None, target: list = if gripper_close: gripper_controller.gripper_close = gripper_close - gripper_controller.transfer(home = home, source = source, target = target,source_approach_axis = source_approach_axis, target_approach_axis = target_approach_axis, source_approach_distance = source_approach_distance, target_approach_distance = target_approach_distance) - print('Finished transfer') + gripper_controller.transfer( + home=home, + source=source, + target=target, + source_approach_axis=source_approach_axis, + target_approach_axis=target_approach_axis, + source_approach_distance=source_approach_distance, + target_approach_distance=target_approach_distance, + ) + print("Finished transfer") gripper_controller.disconnect_gripper() except Exception as err: @@ -156,15 +247,35 @@ def gripper_transfer(self, home:list = None, source: list = None, target: list = gripper_controller.disconnect_gripper() self.home(home) - def gripper_screw_transfer(self, home:list = None, target:list = None, screwdriver_loc: list = None, screw_loc: list = None, screw_time:float = 9, gripper_open:int = None, gripper_close:int = None) -> None: - """ - Using custom made screwdriving solution. + def gripper_screw_transfer( + self, + home: list = None, + target: list = None, + screwdriver_loc: list = None, + screw_loc: list = None, + screw_time: float = 9, + gripper_open: int = None, + gripper_close: int = None, + ) -> None: + """Using custom made screwdriving solution. This function uses linear motions to perform the pick and place movements. + + Args + home (list): Home location + target(list): Target location + screwdriver_loc (list): Location of the screwdriving bit + screw_loc (list): Location where the screwdriving will be performed + screw_time (float): Screwdriving durition + gripper_open (int): Gripper max open value (0-255) + gripper_close (int): Gripper min close value (0-255) + """ self.home(home) try: - gripper_controller = FingerGripperController(hostname = self.hostname, ur = self.ur_connection) + gripper_controller = FingerGripperController( + hostname=self.hostname, ur=self.ur_connection + ) gripper_controller.connect_gripper() if gripper_open: @@ -172,7 +283,7 @@ def gripper_screw_transfer(self, home:list = None, target:list = None, screwdriv if gripper_close: gripper_controller.gripper_close = gripper_close - gripper_controller.pick(pick_goal = screwdriver_loc) + gripper_controller.pick(pick_goal=screwdriver_loc) # # Pick screw self.home(home) @@ -188,13 +299,15 @@ def gripper_screw_transfer(self, home:list = None, target:list = None, screwdriv self.ur_connection.movel(above_target, self.acceleration, self.velocity) self.ur_connection.movel(target, 0.2, 0.2) - target_pose = [0,0,0.00021,0,0,3.14] #Setting the screw drive motion + target_pose = [0, 0, 0.00021, 0, 0, 3.14] # Setting the screw drive motion print("Screwing down") - self.ur_connection.speedl_tool(target_pose,2, screw_time) # This will perform screw driving motion for defined number of seconds - sleep(screw_time+0.5) + self.ur_connection.speedl_tool( + target_pose, 2, screw_time + ) # This will perform screw driving motion for defined number of seconds + sleep(screw_time + 0.5) - self.ur_connection.translate_tool([0,0,-0.03],0.5,0.5) + self.ur_connection.translate_tool([0, 0, -0.03], 0.5, 0.5) self.home(home) gripper_controller.place(place_goal=hex_key) @@ -206,16 +319,43 @@ def gripper_screw_transfer(self, home:list = None, target:list = None, screwdriv finally: gripper_controller.disconnect_gripper() - def gripper_unscrew(self, home:list = None, target:list = None, screwdriver_loc: list = None, screw_loc: list = None, screw_time:float = 10, gripper_open:int = None, gripper_close:int = None) -> None: + def gripper_unscrew( + self, + home: list = None, + target: list = None, + screwdriver_loc: list = None, + screw_loc: list = None, + screw_time: float = 10, + gripper_open: int = None, + gripper_close: int = None, + ) -> None: """Perform unscrewing""" pass - - def remove_cap(self, home:list = None, source:list = None, target:list = None, gripper_open:int = None, gripper_close:int = None) -> None: - """Removes the cap""" + + def remove_cap( + self, + home: list = None, + source: list = None, + target: list = None, + gripper_open: int = None, + gripper_close: int = None, + ) -> None: + """Remove vial cap. This function uses linear motions to perform the pick and place movements. + + Args + home (list): Home location + source (list): Source location + target (list): Target location + gripper_open (int): Gripper max open value (0-255) + gripper_close (int): Gripper min close value (0-255) + + """ self.home(home) try: - gripper_controller = FingerGripperController(hostname = self.hostname, ur = self.ur_connection) + gripper_controller = FingerGripperController( + hostname=self.hostname, ur=self.ur_connection + ) gripper_controller.connect_gripper() if gripper_open: gripper_controller.gripper_open = gripper_open @@ -229,14 +369,16 @@ def remove_cap(self, home:list = None, source:list = None, target:list = None, g self.ur_connection.movel(source, 0.2, 0.2) gripper_controller.close_gripper() - - target_pose = [0,0,-0.001,0,0,-3.14] #Setting the screw drive motion + + target_pose = [0, 0, -0.001, 0, 0, -3.14] # Setting the screw drive motion print("Removing cap") screw_time = 7 - self.ur_connection.speedl_tool(target_pose,2, screw_time) # This will perform screw driving motion for defined number of seconds - sleep(screw_time+0.5) - self.ur_connection.translate_tool([0,0,-0.03],0.5,0.5) - + self.ur_connection.speedl_tool( + target_pose, 2, screw_time + ) # This will perform screw driving motion for defined number of seconds + sleep(screw_time + 0.5) + self.ur_connection.translate_tool([0, 0, -0.03], 0.5, 0.5) + self.home(home) gripper_controller.place(place_goal=target) self.home(home) @@ -244,19 +386,37 @@ def remove_cap(self, home:list = None, source:list = None, target:list = None, g except Exception as err: print(err) - def place_cap(self, home:list = None, source:list = None, target:list = None, gripper_open:int = None, gripper_close:int = None) -> None: - """Places the cap back""" + def place_cap( + self, + home: list = None, + source: list = None, + target: list = None, + gripper_open: int = None, + gripper_close: int = None, + ) -> None: + """Place vial cap. This function uses linear motions to perform the pick and place movements. + + Args + home (list): Home location + source (list): Source location + target (list): Target location + gripper_open (int): Gripper max open value (0-255) + gripper_close (int): Gripper min close value (0-255) + + """ self.home(home) try: - gripper_controller = FingerGripperController(hostname = self.hostname, ur = self.ur_connection) + gripper_controller = FingerGripperController( + hostname=self.hostname, ur=self.ur_connection + ) gripper_controller.connect_gripper() if gripper_open: gripper_controller.gripper_open = gripper_open if gripper_close: gripper_controller.gripper_close = gripper_close - gripper_controller.pick(pick_goal= source) + gripper_controller.pick(pick_goal=source) self.home(home) above_goal = deepcopy(target) @@ -265,30 +425,48 @@ def place_cap(self, home:list = None, source:list = None, target:list = None, gr self.ur_connection.movel(target, 0.1, 0.1) # gripper_controller.close_gripper() - - target_pose = [0,0,0.0001,0,0,2.10] #Setting the screw drive motion + + target_pose = [0, 0, 0.0001, 0, 0, 2.10] # Setting the screw drive motion print("Placing cap") screw_time = 6 - self.ur_connection.speedl_tool(target_pose,2, screw_time) # This will perform screw driving motion for defined number of seconds - sleep(screw_time+0.5) + self.ur_connection.speedl_tool( + target_pose, 2, screw_time + ) # This will perform screw driving motion for defined number of seconds + sleep(screw_time + 0.5) gripper_controller.open_gripper() - self.ur_connection.translate_tool([0,0,-0.03],0.5,0.5) + self.ur_connection.translate_tool([0, 0, -0.03], 0.5, 0.5) self.home(home) - except Exception as err: print(err) - def pick_and_flip_object(self, home:list = None, target: list = None, approach_axis:str = None, target_approach_distance: float = None, gripper_open:int = None, gripper_close:int = None) -> None: - ''' - Pick an object then flips it and puts it back to the same location - ''' + def pick_and_flip_object( + self, + home: list = None, + target: list = None, + approach_axis: str = None, + gripper_open: int = None, + gripper_close: int = None, + ) -> None: + """ + Pick an object then flips it and puts it back to the same location. This function uses linear motions to perform the pick and place movements. + + Args + home (list): Home location + target (list): Target location + approach_axis (str) = Object approach axis + gripper_open (int): Gripper max open value (0-255) + gripper_close (int): Gripper min close value (0-255) + + """ self.home(home) try: - gripper_controller = FingerGripperController(hostname = self.hostname, ur = self.ur_connection) + gripper_controller = FingerGripperController( + hostname=self.hostname, ur=self.ur_connection + ) gripper_controller.connect_gripper() if gripper_open: @@ -296,19 +474,19 @@ def pick_and_flip_object(self, home:list = None, target: list = None, approach_a if gripper_close: gripper_controller.gripper_close = gripper_close - gripper_controller.pick(pick_goal = target, approach_axis = approach_axis) - + gripper_controller.pick(pick_goal=target, approach_axis=approach_axis) + cur_j = self.ur_connection.getj() - rotate_j = cur_j - rotate_j[5] += radians(180) - self.ur_connection.movej(rotate_j,0.6,0.6) + rotate_j = cur_j + rotate_j[5] += radians(180) + self.ur_connection.movej(rotate_j, 0.6, 0.6) cur_l = self.ur_connection.getl() target[3] = cur_l[3] target[4] = cur_l[4] target[5] = cur_l[5] - - gripper_controller.place(place_goal = target, approach_axis = approach_axis) + + gripper_controller.place(place_goal=target, approach_axis=approach_axis) self.home(home) except Exception as er: @@ -316,49 +494,112 @@ def pick_and_flip_object(self, home:list = None, target: list = None, approach_a finally: gripper_controller.disconnect_gripper() - def robotiq_screwdriver_transfer(self, home:list = None, source: list = None, target: list = None, source_approach_axis:str = None, target_approach_axis:str = None, source_approach_distance: float = None, target_approach_distance: float = None) -> None: - ''' - Make a screw transfer using the screwdriver. This function uses linear motions to perform the pick and place movements. - ''' - + def robotiq_screwdriver_transfer( + self, + home: list = None, + source: list = None, + target: list = None, + source_approach_axis: str = None, + target_approach_axis: str = None, + source_approach_distance: float = None, + target_approach_distance: float = None, + ) -> None: + """ + Make a screw transfer using the Robotiq Screwdriver. This function uses linear motions to perform the pick and place movements. + + Args + home (list): Home location + source (list): Source location + target(list): Target location + source_approach_axis (str): Source approach axis (X/Y/Z) + target_approach_axis (str): Target approach axis (X/Y/Z) + source_approach_distance (float): Source approach distance. Unit meters. + target_approach_distance(float): Target approach distance. Unit meters. + """ self.home(home) try: - sr = ScrewdriverController(hostname = self.hostname, ur = self.ur_connection, ur_dashboard = self.ur_dashboard) + sr = ScrewdriverController( + hostname=self.hostname, + ur=self.ur_connection, + ur_dashboard=self.ur_dashboard, + ) sr.screwdriver.activate_screwdriver() - sr.transfer(source=source, target=target, source_approach_axis=source_approach_axis, target_approach_axis = target_approach_axis, source_approach_dist=source_approach_distance, target_approach_dist=target_approach_distance) + sr.transfer( + source=source, + target=target, + source_approach_axis=source_approach_axis, + target_approach_axis=target_approach_axis, + source_approach_dist=source_approach_distance, + target_approach_dist=target_approach_distance, + ) sr.screwdriver.disconnect() except Exception as err: print(err) - + self.home(home) - def pipette_transfer(self, home:list = None, tip_loc: list = None, tip_trash:list = None, source:list = None, target:list = None, volume:int = 10) -> None: - '''a + def pipette_transfer( + self, + home: list = None, + tip_loc: list = None, + tip_trash: list = None, + source: list = None, + target: list = None, + volume: int = 10, + ) -> None: + """ Make a liquid transfer using the pipette. This function uses linear motions to perform the pick and place movements. - ''' + + Args + home (list): Home location + tip_loc (list): Pipette tip location + tip_trash (list): Tip trash location + source (str): Source location + target (str): Target location + volume (int): Pipette transfer volume. Unit number of steps. Each step is 1 mL + """ if not tip_loc or not source: - raise Exception("Please provide both the source and target loactions to make a transfer") - + raise Exception( + "Please provide both the source and target loactions to make a transfer" + ) + try: - pipette = TricontinentPipetteController(hostname = self.hostname, ur = self.ur_connection, pipette_ip=self.hostname) + pipette = TricontinentPipetteController( + hostname=self.hostname, ur=self.ur_connection, pipette_ip=self.hostname + ) pipette.connect_pipette() pipette.pick_tip(tip_loc=tip_loc) self.home(home) - pipette.transfer_sample(home = home, sample_aspirate=source, sample_dispense=target, vol = volume) - pipette.eject_tip(eject_tip_loc=tip_trash,approach_axis="y") + pipette.transfer_sample( + home=home, sample_aspirate=source, sample_dispense=target, vol=volume + ) + pipette.eject_tip(eject_tip_loc=tip_trash, approach_axis="y") pipette.disconnect_pipette() print("Disconnecting from the pipette") except Exception as err: print(err) - finally: - # self.home(home) - pass - - def run_droplet(self, home, tip_loc, sample_loc, droplet_loc, tip_trash): - """Create droplet""" - pipette = OTPipetteController(ur_connection = self.ur_connection, IP = self.hostname) + def run_droplet( + self, + home: list = None, + tip_loc: list = None, + sample_loc: list = None, + droplet_loc: list = None, + tip_trash: list = None, + ) -> None: + """Run the full droplet protocol cycle + + Args + home (list): Home location + tip_loc (list): Pipette tip location + sample_loc (list): Sample location + droplet_loc (list): Location where the droplet will be hung + tip_trash (list): Pipette tip trash location + """ + pipette = OTPipetteController( + ur_connection=self.ur_connection, IP=self.hostname + ) pipette.connect_pipette() self.home(home) @@ -367,81 +608,238 @@ def run_droplet(self, home, tip_loc, sample_loc, droplet_loc, tip_trash): self.home(home) pipette.create_droplet(droplet_loc=droplet_loc) self.home(home) - pipette.empty_tip(sample_loc=sample_loc) + pipette.empty_tip(sample_loc=sample_loc) pipette.eject_tip(eject_tip_loc=tip_trash) self.home(home) pipette.disconnect_pipette() - self.ur_connection.set_tool_communication - def run_urp_program(self, transfer_file_path:str = None, program_name: str = None): + def run_urp_program(self, transfer_file_path: str = None, program_name: str = None): + """Transfers the urp programs onto the polyscope and initiates them - """Transfers the urp programs onto the polyscope and initiates them""" + Args: + trasnfer_file_path (str): Local file path + program_name (str): Name of the file + """ if not program_name: raise ValueError("Provide program name!") - - ur_program_path = "/programs/" + program_name + + ur_program_path = "/programs/" + program_name if transfer_file_path: - self.ur_dashboard.transfer_program(local_path = transfer_file_path, ur_path = ur_program_path) + self.ur_dashboard.transfer_program( + local_path=transfer_file_path, remote_path=ur_program_path + ) sleep(2) - self.ur_dashboard.load_program(program_path = ur_program_path) + self.ur_dashboard.load_program(program_path=ur_program_path) sleep(2) self.ur_dashboard.run_program() sleep(5) - + print("Running the URP program: ", program_name) time_elapsed = 0 - program_err = "" - + program_status = "BUSY" ready_status_count = 0 while program_status == "BUSY": if self.get_movement_state() == "READY": ready_status_count += 1 - if ready_status_count >=6: + if ready_status_count >= 6: program_status = "READY" else: ready_status_count = 0 sleep(3) - program_log = {"output_code":"0", "output_msg": "Successfully finished " + program_name, "output_log": "seconds_elapsed:" + str(time_elapsed)} + program_log = { + "output_code": "0", + "output_msg": "Successfully finished " + program_name, + "output_log": "seconds_elapsed:" + str(time_elapsed), + } return program_log - -if __name__ == "__main__": - pos1= [-0.22575, -0.65792, 0.39271, 2.216, 2.196, -0.043] - pos2= [0.22575, -0.65792, 0.39271, 2.216, 2.196, -0.043] - robot3 = UR(hostname="164.54.116.129") #UR3 - robot5 = UR(hostname="164.54.116.109") #UR5 - - home = [0.5431541204452515, -1.693524023095602, -0.7301170229911804, -2.2898713550963343, 1.567720651626587, -1.0230830351458948] - pipette_loc = [0.21285670041158733, 0.1548897634390196, 0.005543999069077835, 3.137978068966478, -0.009313836267512065, -0.0008972976992386885] - handE_loc = [0.3131286590368134, 0.15480163498252172, 0.005543999069077835, 3.137978068966478, -0.009313836267512065, -0.0008972976992386885] - screwdriver_loc = [0.43804370307762014, 0.15513117190281586, 0.006677533813616729, 3.137978068966478, -0.009313836267512065, -0.0008972976992386885] - - tip1 = [0.04314792894103472, -0.2860322742006418, 0.2280902599833372, 3.1380017093793624, -0.00934365687097245, -0.0006742913527073343] - sample = [0.46141141854542533, -0.060288367363232544, 0.25108778472947074, 3.1380721475655364, -0.009380578809401673, -0.0005480714914954698] - sample_dispense = [0.3171082280819746, -0.2850972337811901, 0.3411125132555506, 3.1379895509880757, -0.009383853947478633, -0.0007087863735219047] - vial_cap = [0.46318998963189156, -0.0618242346521575, 0.22044247577669074, 3.1380871312109466, -0.009283145361593024, -0.0008304449494246685] - vial_cap_holder = [0.3496362594442045, -0.19833129786349898, 0.21851956360142491, 3.1380370691898447, -0.00907338154155439, -0.0006817652068428923] - tip_trash = [0.2584365150735084, -0.29839447002022784, 0.26381819707970183, 3.1380107495494363, -0.009257765762271986, -0.0005604922095049701] - - cell_screw = [0.2874263975289342, -0.2865822322485842, 0.3180272525328063, 3.1380331051698533, -0.009437118292235473, -0.0007542998874791568] - cell_screw2 = [0.28785074639084496, -0.3117594886471939, 0.3180272525328063, 3.138005298544322, -0.009407801356733062, -0.0005678208909298462] + +if __name__ == "__main__": + """Tests""" + + pos1 = [-0.22575, -0.65792, 0.39271, 2.216, 2.196, -0.043] + pos2 = [0.22575, -0.65792, 0.39271, 2.216, 2.196, -0.043] + robot3 = UR(hostname="164.54.116.129") # UR3 + robot5 = UR(hostname="164.54.116.109") # UR5 + + home = [ + 0.5431541204452515, + -1.693524023095602, + -0.7301170229911804, + -2.2898713550963343, + 1.567720651626587, + -1.0230830351458948, + ] + pipette_loc = [ + 0.21285670041158733, + 0.1548897634390196, + 0.005543999069077835, + 3.137978068966478, + -0.009313836267512065, + -0.0008972976992386885, + ] + handE_loc = [ + 0.3131286590368134, + 0.15480163498252172, + 0.005543999069077835, + 3.137978068966478, + -0.009313836267512065, + -0.0008972976992386885, + ] + screwdriver_loc = [ + 0.43804370307762014, + 0.15513117190281586, + 0.006677533813616729, + 3.137978068966478, + -0.009313836267512065, + -0.0008972976992386885, + ] + + tip1 = [ + 0.04314792894103472, + -0.2860322742006418, + 0.2280902599833372, + 3.1380017093793624, + -0.00934365687097245, + -0.0006742913527073343, + ] + sample = [ + 0.46141141854542533, + -0.060288367363232544, + 0.25108778472947074, + 3.1380721475655364, + -0.009380578809401673, + -0.0005480714914954698, + ] + sample_dispense = [ + 0.3171082280819746, + -0.2850972337811901, + 0.3411125132555506, + 3.1379895509880757, + -0.009383853947478633, + -0.0007087863735219047, + ] + vial_cap = [ + 0.46318998963189156, + -0.0618242346521575, + 0.22044247577669074, + 3.1380871312109466, + -0.009283145361593024, + -0.0008304449494246685, + ] + vial_cap_holder = [ + 0.3496362594442045, + -0.19833129786349898, + 0.21851956360142491, + 3.1380370691898447, + -0.00907338154155439, + -0.0006817652068428923, + ] + tip_trash = [ + 0.2584365150735084, + -0.29839447002022784, + 0.26381819707970183, + 3.1380107495494363, + -0.009257765762271986, + -0.0005604922095049701, + ] + + cell_screw = [ + 0.2874263975289342, + -0.2865822322485842, + 0.3180272525328063, + 3.1380331051698533, + -0.009437118292235473, + -0.0007542998874791568, + ] + cell_screw2 = [ + 0.28785074639084496, + -0.3117594886471939, + 0.3180272525328063, + 3.138005298544322, + -0.009407801356733062, + -0.0005678208909298462, + ] # screw_holder = [0.21876722334540147, -0.27273358502932915, 0.39525473397805677, 3.0390618278038524, -0.7398330220514875, 0.016498425988567388] - hex_key = [0.4484990523709619, -0.1237038201799127, 0.2186755997135713, 3.135206222241475, -0.02162004543818643, 0.010296128434757565] - cell_holder = [0.4118605516028128, -0.21084692579385614, 0.21973007226508956, 3.1352348451315843, -0.021708986801283955, 0.010352984644658518] - assembly_deck = [0.3174903285108201, -0.08258211007606345, 0.11525282484663647, 1.2274734115134542, 1.190534780943193, -1.1813375188608897] - assembly_above = [0.31914521296697795, -0.2855210106568889, 0.3477093639368639, 3.1380580674341614, -0.009396149170921641, -0.0006625851593942707] - test_loc = [0.30364466226740844, -0.1243275644148994, 0.2844145579322907, 3.1380384242791366, -0.009336265404641286, -0.0007377624513656736] - - ur5_handE = [0.4505002372355746, 0.16717122871181722, 0.00739685825039485, -3.1306952779568222, 0.020705220743989523, -0.004641385258241053] - ur5_measurement_unit = [-0.04604452261651587, 0.4160887446750063, 0.5646084224408194, -3.130749997794984, 0.020664206546694767, -0.004545336675276125] - ur5_cell_holder = [0.023601991519811308, -0.8269812730950779, 0.22224875259615529, -3.1294422940780655, 0.024367760380236043, -0.006320087283384127] - ur5_home = [0.2069321870803833, -1.4558529642275353, -1.5868407487869263, -1.665375371972555, 1.5850671529769897, -1.350588623677389] + hex_key = [ + 0.4484990523709619, + -0.1237038201799127, + 0.2186755997135713, + 3.135206222241475, + -0.02162004543818643, + 0.010296128434757565, + ] + cell_holder = [ + 0.4118605516028128, + -0.21084692579385614, + 0.21973007226508956, + 3.1352348451315843, + -0.021708986801283955, + 0.010352984644658518, + ] + assembly_deck = [ + 0.3174903285108201, + -0.08258211007606345, + 0.11525282484663647, + 1.2274734115134542, + 1.190534780943193, + -1.1813375188608897, + ] + assembly_above = [ + 0.31914521296697795, + -0.2855210106568889, + 0.3477093639368639, + 3.1380580674341614, + -0.009396149170921641, + -0.0006625851593942707, + ] + test_loc = [ + 0.30364466226740844, + -0.1243275644148994, + 0.2844145579322907, + 3.1380384242791366, + -0.009336265404641286, + -0.0007377624513656736, + ] + + ur5_handE = [ + 0.4505002372355746, + 0.16717122871181722, + 0.00739685825039485, + -3.1306952779568222, + 0.020705220743989523, + -0.004641385258241053, + ] + ur5_measurement_unit = [ + -0.04604452261651587, + 0.4160887446750063, + 0.5646084224408194, + -3.130749997794984, + 0.020664206546694767, + -0.004545336675276125, + ] + ur5_cell_holder = [ + 0.023601991519811308, + -0.8269812730950779, + 0.22224875259615529, + -3.1294422940780655, + 0.024367760380236043, + -0.006320087283384127, + ] + ur5_home = [ + 0.2069321870803833, + -1.4558529642275353, + -1.5868407487869263, + -1.665375371972555, + 1.5850671529769897, + -1.350588623677389, + ] # ---------------------------------------- # print(robot3.ur_connection.getl()) @@ -455,11 +853,11 @@ def run_urp_program(self, transfer_file_path:str = None, program_name: str = Non # robot3.remove_cap(home=home,source=vial_cap,target=vial_cap_holder,gripper_open=120, gripper_close=200) # robot3.place_tool(home,tool_loc=handE_loc) - # # Transfer sample using pipette + # # Transfer sample using pipette # robot3.pick_tool(home,tool_loc=pipette_loc,payload=1.2) # robot3.pipette_transfer(home=home,tip_loc=tip1, tip_trash=tip_trash, source=sample, target=sample_dispense, volume=9) # robot3.place_tool(home,tool_loc=pipette_loc) - + # Install cap on the other side of the cell # robot3.pick_tool(home, handE_loc,payload=1.2) # robot3.place_cap(home=home,source=vial_cap_holder,target=vial_cap,gripper_open=120, gripper_close=200) @@ -472,9 +870,3 @@ def run_urp_program(self, transfer_file_path:str = None, program_name: str = Non robot3.ur.disconnect_ur() robot5.ur.disconnect_ur() - - - - - -