-
Notifications
You must be signed in to change notification settings - Fork 32
/
pybullet_ros.py
241 lines (226 loc) · 12.4 KB
/
pybullet_ros.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
#!/usr/bin/env python3
import os
import importlib
import rospy
import pybullet_data
from std_srvs.srv import Empty
from pybullet_ros.function_exec_manager import FuncExecManager
class pyBulletRosWrapper(object):
"""ROS wrapper class for pybullet simulator"""
def __init__(self):
# import pybullet
self.pb = importlib.import_module('pybullet')
# get from param server the frequency at which to run the simulation
self.loop_rate = rospy.get_param('~loop_rate', 80.0)
# query from param server if gui is needed
is_gui_needed = rospy.get_param('~pybullet_gui', True)
# get from param server if user wants to pause simulation at startup
self.pause_simulation = rospy.get_param('~pause_simulation', False)
print('\033[34m')
# print pybullet stuff in blue
physicsClient = self.start_gui(gui=is_gui_needed) # we dont need to store the physics client for now...
# setup service to restart simulation
rospy.Service('~reset_simulation', Empty, self.handle_reset_simulation)
# setup services for pausing/unpausing simulation
rospy.Service('~pause_physics', Empty, self.handle_pause_physics)
rospy.Service('~unpause_physics', Empty, self.handle_unpause_physics)
# get pybullet path in your system and store it internally for future use, e.g. to set floor
self.pb.setAdditionalSearchPath(pybullet_data.getDataPath())
# create object of environment class for later use
env_plugin = rospy.get_param('~environment', 'environment') # default : plugins/environment.py
plugin_import_prefix = rospy.get_param('~plugin_import_prefix', 'pybullet_ros.plugins')
self.environment = getattr(importlib.import_module(f'{plugin_import_prefix}.{env_plugin}'), 'Environment')(self.pb)
# load robot URDF model, set gravity, and ground plane
self.robot = self.init_pybullet_robot()
self.connected_to_physics_server = None
if not self.robot:
self.connected_to_physics_server = False
return # Error while loading urdf file
else:
self.connected_to_physics_server = True
# get all revolute joint names and pybullet index
rev_joint_index_name_dic, prismatic_joint_index_name_dic, fixed_joint_index_name_dic, link_names_to_ids_dic = self.get_properties()
# import plugins dynamically
self.plugins = []
plugins = rospy.get_param('~plugins', [])
if not plugins:
rospy.logwarn('No plugins found, forgot to set param ~plugins?')
# return to normal shell color
print('\033[0m')
# load plugins
for plugin in plugins:
module_ = plugin.pop("module")
class_ = plugin.pop("class")
params_ = plugin.copy()
rospy.loginfo('loading plugin: {} class from {}'.format(class_, module_))
# create object of the imported file class
obj = getattr(importlib.import_module(module_), class_)(self.pb, self.robot,
rev_joints=rev_joint_index_name_dic,
prism_joints=prismatic_joint_index_name_dic,
fixed_joints=fixed_joint_index_name_dic,
link_ids=link_names_to_ids_dic,
**params_)
# store objects in member variable for future use
self.plugins.append(obj)
rospy.loginfo('pybullet ROS wrapper initialized')
def get_properties(self):
"""
construct 3 dictionaries:
- joint index to joint name x2 (1 for revolute, 1 for fixed joints)
- link name to link index dictionary
"""
rev_joint_index_name_dic = {}
fixed_joint_index_name_dic = {}
prismatic_joint_index_name_dic = {}
link_names_to_ids_dic = {}
for joint_index in range(0, self.pb.getNumJoints(self.robot)):
info = self.pb.getJointInfo(self.robot, joint_index)
# build a dictionary of link names to ids
link_names_to_ids_dic[info[12].decode('utf-8')] = joint_index
# ensure we are dealing with a revolute joint
if info[2] == self.pb.JOINT_REVOLUTE:
# insert key, value in dictionary (joint index, joint name)
rev_joint_index_name_dic[joint_index] = info[1].decode('utf-8') # info[1] refers to joint name
elif info[2] == self.pb.JOINT_FIXED:
# insert key, value in dictionary (joint index, joint name)
fixed_joint_index_name_dic[joint_index] = info[1].decode('utf-8') # info[1] refers to joint name
elif info[2] == self.pb.JOINT_PRISMATIC:
prismatic_joint_index_name_dic[joint_index] = info[1].decode('utf-8') # info[1] refers to joint name
return rev_joint_index_name_dic, prismatic_joint_index_name_dic, fixed_joint_index_name_dic, link_names_to_ids_dic
def handle_reset_simulation(self, req):
"""Callback to handle the service offered by this node to reset the simulation"""
rospy.loginfo('reseting simulation now')
self.pb.resetSimulation()
return Empty()
def start_gui(self, gui=True):
"""start physics engine (client) with or without gui"""
if(gui):
# start simulation with gui
rospy.loginfo('Running pybullet with gui')
rospy.loginfo('-------------------------')
gui_options = rospy.get_param('~gui_options', '') # e.g. to maximize screen: options="--width=2560 --height=1440"
return self.pb.connect(self.pb.GUI, options=gui_options)
else:
# start simulation without gui (non-graphical version)
rospy.loginfo('Running pybullet without gui')
# hide console output from pybullet
rospy.loginfo('-------------------------')
return self.pb.connect(self.pb.DIRECT)
def init_pybullet_robot(self):
"""load robot URDF model, set gravity, ground plane and environment"""
# get from param server the path to the URDF robot model to load at startup
urdf_path = rospy.get_param('~robot_urdf_path', None)
if urdf_path == None:
rospy.signal_shutdown('mandatory param robot_urdf_path not set, will exit now')
# test urdf file existance
if not os.path.isfile(urdf_path):
rospy.logerr('param robot_urdf_path is set, but file does not exist : ' + urdf_path)
rospy.signal_shutdown('required robot urdf file not found')
return None
# ensure urdf is not xacro, but if it is then make urdf file version out of it
if 'xacro' in urdf_path:
robot_description = rospy.get_param('robot_description', None)
if not robot_description:
rospy.logerr('required robot_description param not set')
return None
# remove xacro from name
urdf_path_without_xacro = urdf_path[0:urdf_path.find('.xacro')]+urdf_path[urdf_path.find('.xacro')+len('.xacro'):]
rospy.loginfo('generating urdf model from xacro from robot_description param server under: {0}'.format(urdf_path_without_xacro))
try:
urdf_file = open(urdf_path_without_xacro,'w')
except:
rospy.logerr('Failed to create urdf file from xacro, cannot write into destination: {0}'.format(urdf_path_without_xacro))
return None
urdf_file.write(robot_description)
urdf_file.close()
urdf_path = urdf_path_without_xacro
# get robot spawn pose from parameter server
robot_pose_x = rospy.get_param('~robot_pose_x', 0.0)
robot_pose_y = rospy.get_param('~robot_pose_y', 0.0)
robot_pose_z = rospy.get_param('~robot_pose_z', 1.0)
robot_pose_yaw = rospy.get_param('~robot_pose_yaw', 0.0)
robot_spawn_orientation = self.pb.getQuaternionFromEuler([0.0, 0.0, robot_pose_yaw])
fixed_base = rospy.get_param('~fixed_base', False)
# load robot from URDF model
# user decides if inertia is computed automatically by pybullet or custom
if rospy.get_param('~use_intertia_from_file', False):
# combining several boolean flags using "or" according to pybullet documentation
urdf_flags = self.pb.URDF_USE_INERTIA_FROM_FILE | self.pb.URDF_USE_SELF_COLLISION
else:
urdf_flags = self.pb.URDF_USE_SELF_COLLISION
# load environment
rospy.loginfo('loading environment')
self.environment.load_environment()
# set no realtime simulation, NOTE: no need to stepSimulation if setRealTimeSimulation is set to 1
self.pb.setRealTimeSimulation(0) # NOTE: does not currently work with effort controller, thats why is left as 0
rospy.loginfo('loading urdf model: ' + urdf_path)
# NOTE: self collision enabled by default
return self.pb.loadURDF(urdf_path, basePosition=[robot_pose_x, robot_pose_y, robot_pose_z],
baseOrientation=robot_spawn_orientation,
useFixedBase=fixed_base, flags=urdf_flags)
def handle_reset_simulation(self, req):
"""Callback to handle the service offered by this node to reset the simulation"""
rospy.loginfo('reseting simulation now')
# pause simulation to prevent reading joint values with an empty world
self.pause_simulation = True
# remove all objects from the world and reset the world to initial conditions
self.pb.resetSimulation()
# load URDF model again, set gravity and floor
self.init_pybullet_robot()
# resume simulation control cycle now that a new robot is in place
self.pause_simulation = False
return []
def handle_pause_physics(self, req):
"""pause simulation, raise flag to prevent pybullet to execute self.pb.stepSimulation()"""
rospy.loginfo('pausing simulation')
self.pause_simulation = False
return []
def handle_unpause_physics(self, req):
"""unpause simulation, lower flag to allow pybullet to execute self.pb.stepSimulation()"""
rospy.loginfo('unpausing simulation')
self.pause_simulation = True
return []
def pause_simulation_function(self):
return self.pause_simulation
def start_pybullet_ros_wrapper_sequential(self):
"""
This function is deprecated, we recommend the use of parallel plugin execution
"""
rate = rospy.Rate(self.loop_rate)
while not rospy.is_shutdown():
if not self.pause_simulation:
# run x plugins
for task in self.plugins:
task.execute()
# perform all the actions in a single forward dynamics simulation step such
# as collision detection, constraint solving and integration
self.pb.stepSimulation()
rate.sleep()
rospy.logwarn('killing node now...')
# if node is killed, disconnect
if self.connected_to_physics_server:
self.pb.disconnect()
def start_pybullet_ros_wrapper_parallel(self):
"""
Execute plugins in parallel, however watch their execution time and warn if exceeds the deadline (loop rate)
"""
# create object of our parallel execution manager
exec_manager_obj = FuncExecManager(self.plugins, rospy.is_shutdown, self.pb.stepSimulation, self.pause_simulation_function,
log_info=rospy.loginfo, log_warn=rospy.logwarn, log_debug=rospy.logdebug, function_name='plugin')
# start parallel execution of all "execute" class methods in a synchronous way
exec_manager_obj.start_synchronous_execution(loop_rate=self.loop_rate)
# ctrl + c was pressed, exit
rospy.logwarn('killing node now...')
# if node is killed, disconnect
if self.connected_to_physics_server:
self.pb.disconnect()
def start_pybullet_ros_wrapper(self):
if rospy.get_param('~parallel_plugin_execution', True):
self.start_pybullet_ros_wrapper_parallel()
else:
self.start_pybullet_ros_wrapper_sequential()
def main():
"""function called by pybullet_ros_node script"""
rospy.init_node('pybullet_ros', anonymous=False) # node name gets overrided if launched by a launch file
pybullet_ros_interface = pyBulletRosWrapper()
pybullet_ros_interface.start_pybullet_ros_wrapper()