-
Notifications
You must be signed in to change notification settings - Fork 301
/
spawner.py
executable file
·186 lines (156 loc) · 6.8 KB
/
spawner.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
#!/usr/bin/env python3
# Copyright 2021 PAL Robotics S.L.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import argparse
import errno
import os
import subprocess
import sys
import time
from controller_manager import configure_controller, list_controllers, \
load_controller, switch_controllers, unload_controller
import rclpy
from rclpy.duration import Duration
from rclpy.node import Node
def wait_for_controller_manager(node, controller_manager, timeout_duration):
def full_name(n):
return n[1] + ('' if n[1].endswith('/') else '/') + n[0]
# Wait for controller_manager
timeout = node.get_clock().now() + Duration(seconds=timeout_duration)
while node.get_clock().now() < timeout:
node_names_and_namespaces = node.get_node_names_and_namespaces()
if any(full_name(n) == controller_manager for n in node_names_and_namespaces):
return True
node.get_logger().info(
'Waiting for {} services'.format(controller_manager),
throttle_duration_sec=2)
time.sleep(0.2)
return False
def is_controller_loaded(node, controller_manager, controller_name):
controllers = list_controllers(node, controller_manager).controller
return any(c.name == controller_name for c in controllers)
def make_absolute(name):
return name if name.startswith('/') else ('/' + name)
def main(args=None):
rclpy.init(args=args)
parser = argparse.ArgumentParser()
parser.add_argument(
'controller_name', help='Name of the controller')
parser.add_argument(
'-c', '--controller-manager', help='Name of the controller manager ROS node',
default='/controller_manager', required=False)
parser.add_argument(
'-p', '--param-file',
help='Controller param file to be loaded into controller node before configure',
required=False)
parser.add_argument(
'--load-only', help='Only load the controller and leave unconfigured.',
action='store_true', required=False)
parser.add_argument(
'--stopped', help='Load and configure the controller, however do not start them',
action='store_true', required=False)
parser.add_argument(
'-t', '--controller-type',
help='If not provided it should exist in the controller manager namespace',
default=None, required=False)
parser.add_argument(
'-u', '--unload-on-kill',
help='Wait until this application is interrupted and unload controller',
action='store_true')
parser.add_argument(
'--controller-manager-timeout',
help='Time to wait for the controller manager', required=False, default=10, type=int)
command_line_args = rclpy.utilities.remove_ros_args(args=sys.argv)[1:]
args = parser.parse_args(command_line_args)
controller_name = args.controller_name
controller_manager_name = make_absolute(args.controller_manager)
param_file = args.param_file
controller_type = args.controller_type
controller_manager_timeout = args.controller_manager_timeout
if param_file and not os.path.isfile(param_file):
raise FileNotFoundError(
errno.ENOENT, os.strerror(errno.ENOENT), param_file)
node = Node('spawner_' + controller_name)
try:
if not wait_for_controller_manager(node, controller_manager_name,
controller_manager_timeout):
node.get_logger().error('Controller manager not available')
return 1
if is_controller_loaded(node, controller_manager_name, controller_name):
node.get_logger().info('Controller already loaded, skipping load_controller')
else:
if controller_type:
ret = subprocess.run(['ros2', 'param', 'set', controller_manager_name,
controller_name + '.type', controller_type])
ret = load_controller(node, controller_manager_name, controller_name)
if not ret.ok:
# Error message printed by ros2 control
return 1
node.get_logger().info('Loaded ' + controller_name)
if param_file:
ret = subprocess.run(['ros2', 'param', 'load', controller_name,
param_file])
if ret.returncode != 0:
# Error message printed by ros2 param
return ret.returncode
node.get_logger().info('Loaded ' + param_file + ' into ' + controller_name)
if not args.load_only:
ret = configure_controller(node, controller_manager_name, controller_name)
if not ret.ok:
node.get_logger().info('Failed to configure controller')
return 1
if not args.stopped:
ret = switch_controllers(
node,
controller_manager_name,
[],
[controller_name],
True,
True,
5.0)
if not ret.ok:
node.get_logger().info('Failed to start controller')
return 1
node.get_logger().info('Configured and started ' + controller_name)
if not args.unload_on_kill:
return 0
try:
node.get_logger().info('Waiting until interrupt to unload controllers')
while True:
time.sleep(1)
except KeyboardInterrupt:
if not args.stopped:
node.get_logger().info('Interrupt captured, stopping and unloading controller')
ret = switch_controllers(
node,
controller_manager_name,
[controller_name],
[],
True,
True,
5.0)
if not ret.ok:
node.get_logger().info('Failed to stop controller')
return 1
node.get_logger().info('Stopped controller')
ret = unload_controller(node, controller_manager_name, controller_name)
if not ret.ok:
node.get_logger().info('Failed to unload controller')
return 1
node.get_logger().info('Unloaded controller')
return 0
finally:
rclpy.shutdown()
if __name__ == '__main__':
sys.exit(main())