-
Notifications
You must be signed in to change notification settings - Fork 3
/
stretch_controller.py
340 lines (268 loc) · 13.5 KB
/
stretch_controller.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
import sys
import time
from typing import Any, Optional, Tuple
import cv2
import numpy as np
import rerun as rr
from airo_camera_toolkit.utils.image_converter import ImageConverter
from airo_typing import HomogeneousMatrixType, OpenCVIntImageType, PointCloud
from cloth_tools.controllers.controller import Controller
from cloth_tools.controllers.grasp_highest_controller import hang_in_the_air_tcp_pose
from cloth_tools.controllers.grasp_lowest_controller import create_cloth_obstacle_planner
from cloth_tools.drake.visualization import publish_dual_arm_trajectory
from cloth_tools.ompl.dual_arm_planner import DualArmOmplPlanner
from cloth_tools.path.execution import execute_dual_arm_trajectory, time_parametrize_toppra
from cloth_tools.point_clouds.camera import get_image_and_filtered_point_cloud
from cloth_tools.stations.competition_station import CompetitionStation
from cloth_tools.stations.dual_arm_station import DualArmStation
from cloth_tools.visualization.opencv import draw_pose
from cloth_tools.visualization.rerun import rr_log_camera
from cloth_tools.wrench_smoother import WrenchSmoother
from linen.elemental.move_backwards import move_pose_backwards
from loguru import logger
from pydrake.trajectories import Trajectory
class StretchController(Controller):
"""
Grasps the lowest point of a piece of cloth and lift the cloth into the air.
Currently always uses the left arm to grasp.
"""
def __init__(self, station: DualArmStation):
self.station = station
# Attributes that will be set in plan()
self._image: Optional[OpenCVIntImageType] = None
self._point_cloud: Optional[PointCloud] = None
self._stretch_pose_left: Optional[HomogeneousMatrixType] = None
self._stretch_pose_right: Optional[HomogeneousMatrixType] = None
self._path_to_stretch: Optional[Any] = None
self._trajectory_to_stretch: Optional[Trajectory] = None
self._time_trajectory_to_stretch: Optional[Trajectory] = None
camera = self.station.camera
camera_pose = self.station.camera_pose
# Setting up the OpenCV window
window_name = self.__class__.__name__
cv2.namedWindow(window_name, cv2.WINDOW_NORMAL)
cv2.resizeWindow(window_name, 1280, 720)
# Setting up Rerun
rr.init(window_name, spawn=True)
rr_log_camera(camera, camera_pose)
def _create_cloth_obstacle_planner(self, point_cloud_cloth: PointCloud) -> DualArmOmplPlanner:
(
planner,
hull,
diagram,
context,
collision_checker,
meshcat,
arm_indices,
gripper_indices,
) = create_cloth_obstacle_planner(self.station, point_cloud_cloth, left_hanging=True)
# Save all this for visualization
self._hull = hull
self._diagram = diagram
self._context = context
self._collision_checker = collision_checker
self._meshcat = meshcat
self._arm_indices = arm_indices
self._gripper_indices = gripper_indices
return planner
def plan(self) -> None:
logger.info(f"{self.__class__.__name__}: Creating new plan.")
camera = self.station.camera
camera_pose = self.station.camera_pose
# These are retrieved just for visualization
image_rgb, _, point_cloud = get_image_and_filtered_point_cloud(camera, camera_pose)
image = ImageConverter.from_numpy_int_format(image_rgb).image_in_opencv_format
self._image = image
self._point_cloud = point_cloud
hang_pose_left = hang_in_the_air_tcp_pose(left=True)
hang_pose_right = hang_in_the_air_tcp_pose(left=False)
X_LCB_TCP = self.station.dual_arm.left_manipulator.get_tcp_pose()
X_RCB_TCP = self.station.dual_arm.right_manipulator.get_tcp_pose()
X_W_LCB = self.station.left_arm_pose
X_W_RCB = self.station.right_arm_pose
X_W_LTCP = X_W_LCB @ X_LCB_TCP
X_W_RTCP = X_W_RCB @ X_RCB_TCP
tcp_distance = np.linalg.norm(X_W_LTCP[:3, 3] - X_W_RTCP[:3, 3])
logger.info(f"{self.__class__.__name__}: Distance between TCPs at start: {tcp_distance:.3f} meters")
backwards_shift = 0.9 * (tcp_distance / 2.0) # set distance to a percentage of the distance when grasping
stretch_pose_left = move_pose_backwards(hang_pose_left, backwards_shift)
stretch_pose_right = move_pose_backwards(hang_pose_right, backwards_shift)
# Move arms closer to the camera
global_x_shift = -0.4
stretch_pose_left[:3, 3] += np.array([global_x_shift, 0, 0])
stretch_pose_right[:3, 3] += np.array([global_x_shift, 0, 0])
self._stretch_pose_left = stretch_pose_left
self._stretch_pose_right = stretch_pose_right
dual_arm = self.station.dual_arm
start_joints_left = dual_arm.left_manipulator.get_joint_configuration()
start_joints_right = dual_arm.right_manipulator.get_joint_configuration()
planner = self.station.planner
path_to_stretch = planner.plan_to_tcp_pose(
start_joints_left, start_joints_right, stretch_pose_left, stretch_pose_right
)
self._path_to_stretch = path_to_stretch
if path_to_stretch is None:
return
# Time parametrize the path
# Move very slowly
trajectory_to_stretch, time_trajectory_to_stretch = time_parametrize_toppra(
path_to_stretch, self.station._diagram.plant(), joint_speed_limit=0.5, joint_acceleration_limit=0.5
)
self._trajectory_to_stretch = trajectory_to_stretch
self._time_trajectory_to_stretch = time_trajectory_to_stretch
def execute_stretch_with_force(self):
station = self.station
dual_arm = station.dual_arm
X_W_LCB = station.left_arm_pose
X_W_RCB = station.right_arm_pose
tension_threshold = 4.0
tcp_distance_threshold = 0.9 # never move more that this distance apart
servo_period = 0.05 # seconds
servo_speed_start = 0.05 # meters per second starting fast result in a shock in the F/T readings
servo_distance_start = servo_speed_start * servo_period
servo_distance = servo_distance_start
servos_per_second = 1.0 / servo_period
history = int(servos_per_second * 2) # smooth over last 2 seconds
history = max(history, 1)
logger.info("Smoothing over a history of {} samples".format(history))
wrench_smoother_left = WrenchSmoother(history=history)
wrench_smoother_right = WrenchSmoother(history=history)
servo_awaitable = None
warmup = 1.0
timeout = 5.0
time_start = time.time()
while True:
wrench_left = dual_arm.left_manipulator.rtde_receive.getActualTCPForce()
wrench_right = dual_arm.right_manipulator.rtde_receive.getActualTCPForce()
wrench_smoothed_left = wrench_smoother_left.add_wrench(wrench_left)
wrench_smoothed_right = wrench_smoother_right.add_wrench(wrench_right)
# log each scalar
for i, label in zip(range(3), ["Fx", "Fy", "Fz"]):
rr.log(f"/force/left/{label}", rr.Scalar(wrench_smoothed_left[i]))
rr.log(f"/force/right/{label}", rr.Scalar(wrench_smoothed_right[i]))
for i, label in zip(range(3, 6), ["Tx", "Ty", "Tz"]):
rr.log(f"/torque/left/{label}", rr.Scalar(wrench_smoothed_left[i]))
rr.log(f"/torque/right/{label}", rr.Scalar(wrench_smoothed_right[i]))
tension = (wrench_smoothed_left[0] - wrench_smoothed_right[0]) / 2.0
logger.info(f"Tension: {tension:.1f} N")
rr.log("/force/tension", rr.Scalar(tension))
if time.time() - time_start < warmup:
# During the warmup time don't servo or check for tension
continue
if time.time() - time_start > timeout:
logger.warning(f"Tension threshold not reached within timeout, tension: {tension}")
dual_arm.left_manipulator.rtde_control.servoStop()
dual_arm.right_manipulator.rtde_control.servoStop()
break
X_LCB_TCP = dual_arm.left_manipulator.get_tcp_pose()
X_RCB_TCP = dual_arm.right_manipulator.get_tcp_pose()
X_W_LTCP = X_W_LCB @ X_LCB_TCP
X_W_RTCP = X_W_RCB @ X_RCB_TCP
tcp_distance = np.linalg.norm(X_W_LTCP[:3, 3] - X_W_RTCP[:3, 3])
X_servo_L = move_pose_backwards(X_W_LTCP, servo_distance)
X_servo_R = move_pose_backwards(X_W_RTCP, servo_distance)
if servo_awaitable is not None:
servo_awaitable.wait()
if tension > tension_threshold:
logger.info(f"Tension reached threshold {tension:.2f} N")
dual_arm.left_manipulator.rtde_control.servoStop()
dual_arm.right_manipulator.rtde_control.servoStop()
break
if tcp_distance > tcp_distance_threshold:
logger.info(f"TCP distance reached threshold {tcp_distance:.2f} m")
dual_arm.left_manipulator.rtde_control.servoStop()
dual_arm.right_manipulator.rtde_control.servoStop()
break
servo_awaitable = dual_arm.servo_to_tcp_pose(X_servo_L, X_servo_R, time=servo_period) # about 1 cm/s
def execute_stretch(self) -> None:
dual_arm = self.station.dual_arm
# Execute the path to the strech start poses
execute_dual_arm_trajectory(dual_arm, self._trajectory_to_stretch, self._time_trajectory_to_stretch)
# Execute the stretch with force
self.execute_stretch_with_force()
def _can_execute(self) -> bool:
# maybe this should just be a property?
if self._trajectory_to_stretch is None:
return False
return True
def visualize_plan(self) -> Tuple[OpenCVIntImageType, Any]: # noqa C901
if self._image is None:
raise RuntimeError("You must call plan() before visualize_plan().")
image = self._image
camera_pose = self.station.camera_pose
intrinsics = self.station.camera.intrinsics_matrix()
if self._stretch_pose_left is not None:
pose = self._stretch_pose_left
draw_pose(image, pose, intrinsics, camera_pose)
rr_pose = rr.Transform3D(translation=pose[0:3, 3], mat3x3=pose[0:3, 0:3] @ np.identity(3) * 0.1)
rr.log("world/stretch_pose_left", rr_pose)
if self._stretch_pose_right is not None:
pose = self._stretch_pose_right
draw_pose(image, pose, intrinsics, camera_pose)
rr_pose = rr.Transform3D(translation=pose[0:3, 3], mat3x3=pose[0:3, 0:3] @ np.identity(3) * 0.1)
rr.log("world/stretch_pose_right", rr_pose)
if self._path_to_stretch is not None:
trajectory = self._trajectory_to_stretch
time_trajectory = self._time_trajectory_to_stretch
publish_dual_arm_trajectory(
trajectory,
time_trajectory,
self.station._meshcat,
self.station._diagram,
self.station._context,
*self.station._arm_indices,
)
if self._point_cloud is not None:
point_cloud = self._point_cloud
rr_point_cloud = rr.Points3D(positions=point_cloud.points, colors=point_cloud.colors)
rr.log("world/point_cloud", rr_point_cloud)
# Duplicated from GraspHighestController
if self._can_execute():
image_annotated = image.copy()
text = "Execute? Press (y/n)"
cv2.putText(image_annotated, text, (10, 60), cv2.FONT_HERSHEY_SIMPLEX, 2, (0, 255, 0), 3, cv2.LINE_AA)
logger.info(f"{self.__class__.__name__}: {text} in OpenCV window.")
cv2.imshow(self.__class__.__name__, image_annotated)
key = cv2.waitKey(0)
return image, key
else:
cv2.imshow(self.__class__.__name__, image)
logger.info(f"{self.__class__.__name__}: Execution not possible yet. Press any key to plan again.")
return image, cv2.waitKey(0)
def execute_plan(self) -> None:
if not self._can_execute():
logger.warning(f"{self.__class__.__name__}: not executing because the plan is not complete.")
return
self.execute_stretch()
# TODO: remove this duplication, maybe through inheritance?
def execute_interactive(self) -> None:
while True:
self.plan()
_, key = self.visualize_plan()
if key == ord("p"):
key = cv2.waitKey(0)
elif key == ord("y") and self._can_execute():
self.execute_plan()
return
elif key == ord("n"):
continue
elif key == ord("q"):
sys.exit(0)
def execute(self, interactive: bool = True) -> None:
logger.info(f"{self.__class__.__name__} started.")
if interactive:
self.execute_interactive()
else:
# Autonomous execution
self.plan()
self.visualize_plan()
self.execute_plan()
# Close cv2 window to reduce clutter
cv2.destroyWindow(self.__class__.__name__)
logger.info(f"{self.__class__.__name__} finished.")
if __name__ == "__main__":
station = CompetitionStation()
dual_arm = station.dual_arm
# Assumes both arms are aleady holding the cloth
stretch_controller = StretchController(station)
stretch_controller.execute(interactive=True)