Skip to content

Commit ef3930e

Browse files
committed
Add more tests for launch_utils of controller manager (ros-controls#2147)
1 parent 152dce9 commit ef3930e

9 files changed

+805
-0
lines changed

controller_manager/CMakeLists.txt

Lines changed: 24 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -244,10 +244,34 @@ if(BUILD_TESTING)
244244
)
245245

246246
find_package(ament_cmake_pytest REQUIRED)
247+
find_package(launch_testing_ament_cmake REQUIRED)
248+
find_package(rclpy REQUIRED)
247249
install(FILES test/test_ros2_control_node.yaml
250+
test/test_controller_load.yaml
251+
test/test_ros2_control_node_combined.yaml
248252
DESTINATION test)
249253
ament_add_pytest_test(test_ros2_control_node test/test_ros2_control_node_launch.py)
250254
ament_add_pytest_test(test_test_utils test/test_test_utils.py)
255+
ament_add_pytest_test(test_launch_utils_unit test/test_launch_utils_unit.py
256+
APPEND_ENV AMENT_PREFIX_PATH=${ament_index_build_path}
257+
PYTHONPATH=${CMAKE_CURRENT_BINARY_DIR}:${CMAKE_CURRENT_SOURCE_DIR}:${PYTHONPATH}
258+
)
259+
# ----------------------------------------------------------------------------
260+
# Integration tests for individual launch_utils entry points
261+
# ----------------------------------------------------------------------------
262+
ament_add_pytest_test(test_launch_utils_integration_list
263+
test/test_launch_utils_integration_list.py
264+
TIMEOUT 60
265+
)
266+
ament_add_pytest_test(test_launch_utils_integration_dict
267+
test/test_launch_utils_integration_dict.py
268+
TIMEOUT 60
269+
)
270+
ament_add_pytest_test(test_launch_utils_integration_load
271+
test/test_launch_utils_integration_load.py
272+
TIMEOUT 60
273+
)
274+
251275
endif()
252276

253277
install(
Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,8 @@
1+
controller_manager:
2+
ros__parameters:
3+
update_rate: 100 # Hz
4+
5+
controller3:
6+
ros__parameters:
7+
type: "controller_manager/test_controller"
8+
joint_names: ["joint3"]
Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,8 @@
1+
controller_manager:
2+
ros__parameters:
3+
update_rate: 100 # Hz
4+
5+
test_controller_load:
6+
ros__parameters:
7+
type: "controller_manager/test_controller"
8+
joint_names: ["joint1"]
Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,7 @@
1+
controller_manager:
2+
ros__parameters:
3+
update_rate: 100 # Hz
4+
5+
joint_state_broadcaster:
6+
ros__parameters:
7+
type: "joint_state_broadcaster/JointStateBroadcaster"
Lines changed: 240 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,240 @@
1+
#!/usr/bin/env python3
2+
# Copyright 2025 Robert Kwan
3+
#
4+
# Licensed under the Apache License, Version 2.0 (the "License");
5+
# you may not use this file except in compliance with the License.
6+
# You may obtain a copy of the License at
7+
#
8+
# http://www.apache.org/licenses/LICENSE-2.0
9+
#
10+
# Unless required by applicable law or agreed to in writing, software
11+
# distributed under the License is distributed on an "AS IS" BASIS,
12+
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
# See the License for the specific language governing permissions and
14+
# limitations under the License.
15+
16+
import pytest
17+
import unittest
18+
import os
19+
import tempfile
20+
import time
21+
22+
from ament_index_python.packages import get_package_share_directory, get_package_prefix
23+
from launch import LaunchDescription
24+
import launch_testing
25+
from launch_testing.actions import ReadyToTest
26+
import launch_testing.markers
27+
import launch_ros.actions
28+
29+
import rclpy
30+
31+
from controller_manager.launch_utils import (
32+
generate_controllers_spawner_launch_description_from_dict,
33+
)
34+
35+
36+
@pytest.mark.launch_test
37+
def generate_test_description():
38+
"""
39+
Generate launch description for testing.
40+
41+
THIS VERSION CREATES ALL NEEDED FILES DYNAMICALLY AND USES THE COMBINED CONFIG.
42+
"""
43+
44+
# Create temporary directory for all test files
45+
temp_dir = tempfile.mkdtemp()
46+
print(f"Creating test files in: {temp_dir}")
47+
48+
# Get URDF, without involving xacro
49+
urdf = os.path.join(
50+
get_package_share_directory("ros2_control_test_assets"),
51+
"urdf",
52+
"test_hardware_components.urdf",
53+
)
54+
with open(urdf) as infp:
55+
robot_description_content = infp.read()
56+
robot_description = {"robot_description": robot_description_content}
57+
58+
robot_controllers = os.path.join(
59+
get_package_prefix("controller_manager"), "test", "test_ros2_control_node_combined.yaml"
60+
)
61+
62+
# Verify both files exist
63+
assert os.path.isfile(robot_controllers), f"Controller config not created: {robot_controllers}"
64+
assert os.path.isfile(urdf), f"URDF not created: {urdf}"
65+
66+
robot_state_pub_node = launch_ros.actions.Node(
67+
package="robot_state_publisher",
68+
executable="robot_state_publisher",
69+
output="both",
70+
parameters=[robot_description],
71+
)
72+
73+
# ===== START CONTROLLER MANAGER (ros2_control_node) =====
74+
control_node = launch_ros.actions.Node(
75+
package="controller_manager",
76+
executable="ros2_control_node",
77+
parameters=[robot_controllers], # Use the combined config file
78+
output="both",
79+
)
80+
81+
# The dictionary keys are the controller names to be spawned/started.
82+
# Values can be empty lists since config is provided via the main YAML.
83+
ctrl_dict = {
84+
"joint_state_broadcaster": [robot_controllers],
85+
"ctrl_with_parameters_and_type": [robot_controllers],
86+
"controller3": [robot_controllers],
87+
}
88+
controller_list = list(ctrl_dict.keys())
89+
90+
# ===== GENERATE SPAWNER LAUNCH DESCRIPTION =====
91+
print(f"Spawning controllers: {controller_list}")
92+
93+
# Correct function name and call
94+
spawner_ld = generate_controllers_spawner_launch_description_from_dict(
95+
controller_info_dict=ctrl_dict,
96+
)
97+
98+
# ===== CREATE LAUNCH DESCRIPTION =====
99+
ld = LaunchDescription(
100+
[robot_state_pub_node, control_node, ReadyToTest()] + spawner_ld.entities
101+
)
102+
103+
# Return tuple with launch description and test context
104+
return ld, {
105+
"controller_list": controller_list, # Key name updated to match the test function
106+
"robot_controllers": str(robot_controllers),
107+
"urdf_file": urdf,
108+
"temp_dir": temp_dir,
109+
}
110+
111+
112+
# Active tests
113+
class TestControllerSpawnerList(unittest.TestCase):
114+
"""Active tests that run while the launch is active."""
115+
116+
@classmethod
117+
def setUpClass(cls):
118+
rclpy.init()
119+
120+
@classmethod
121+
def tearDownClass(cls):
122+
rclpy.shutdown()
123+
124+
def test_spawner_nodes_launched(self, proc_info):
125+
"""Ensure processes are running."""
126+
process_names = proc_info.process_names()
127+
self.assertGreater(len(process_names), 0)
128+
print("\n[TEST] Active processes: {process_names}")
129+
130+
def test_controllers_loaded(self, proc_info, controller_list):
131+
"""Test that controllers were loaded (poll until they appear)."""
132+
node = rclpy.create_node("test_controller_query_node")
133+
134+
try:
135+
from controller_manager_msgs.srv import ListControllers
136+
137+
client = node.create_client(ListControllers, "/controller_manager/list_controllers")
138+
139+
print("\n[TEST] Waiting for controller_manager service...")
140+
wait_for_svc_timeout = 30.0
141+
if not client.wait_for_service(timeout_sec=wait_for_svc_timeout):
142+
process_names = proc_info.process_names()
143+
self.fail(
144+
f"Controller manager service not available after {wait_for_svc_timeout}s.\n"
145+
f"Active processes: {process_names}"
146+
)
147+
148+
# Poll for controllers to be registered
149+
print("[TEST] Service available, polling for controllers (timeout 30s)...")
150+
deadline = node.get_clock().now() + rclpy.duration.Duration(seconds=30.0)
151+
seen = []
152+
while node.get_clock().now() < deadline:
153+
req = ListControllers.Request()
154+
fut = client.call_async(req)
155+
rclpy.spin_until_future_complete(node, fut, timeout_sec=2.0)
156+
if fut.done() and fut.result() is not None:
157+
response = fut.result()
158+
seen = [c.name for c in response.controller]
159+
if all(ctrl in seen for ctrl in controller_list):
160+
print(f"[TEST] Loaded controllers: {seen}")
161+
break
162+
# small sleep to avoid tight-loop
163+
time.sleep(0.2)
164+
else:
165+
# timeout expired
166+
self.fail(
167+
f"Timeout waiting for controllers to be loaded. "
168+
f"Expected: {controller_list}, saw: {seen}"
169+
)
170+
171+
# Final assert (defensive)
172+
for controller in controller_list:
173+
self.assertIn(
174+
controller,
175+
seen,
176+
f"Controller '{controller}' was not loaded. Available: {seen}",
177+
)
178+
179+
print(f"[TEST] ? All {len(controller_list)} controllers loaded successfully")
180+
181+
finally:
182+
node.destroy_node()
183+
184+
def test_spawner_exit_code(self, proc_info):
185+
"""Test that spawner process ran (may have completed already)."""
186+
process_names = proc_info.process_names()
187+
print(f"\n[TEST] Checking for spawner in: {process_names}")
188+
189+
# The spawner may have already completed successfully and exited
190+
# So we just verify that we have processes running
191+
self.assertGreater(len(process_names), 0)
192+
print(f"[TEST] ? Launch has {len(process_names)} active processes")
193+
194+
195+
@launch_testing.post_shutdown_test()
196+
class TestProcessOutput(unittest.TestCase):
197+
"""Post-shutdown tests."""
198+
199+
def test_exit_codes(self, proc_info):
200+
"""Verify all processes exited successfully."""
201+
print("\n[POST-SHUTDOWN] Process exit codes:")
202+
for process_name in proc_info.process_names():
203+
info = proc_info[process_name]
204+
print(f" {process_name}: {info.returncode}")
205+
206+
for process_name in proc_info.process_names():
207+
info = proc_info[process_name]
208+
209+
if "ros2_control_node" in process_name:
210+
self.assertEqual(
211+
info.returncode, 0, f"{process_name} exited with {info.returncode}"
212+
)
213+
elif "spawner" in process_name:
214+
# Spawner should complete successfully (0) or be terminated
215+
self.assertIn(
216+
info.returncode,
217+
[0, -2, -15],
218+
f"Spawner {process_name} exited with {info.returncode}",
219+
)
220+
else:
221+
self.assertIn(
222+
info.returncode, [0, -2, -15], f"{process_name} exited with {info.returncode}"
223+
)
224+
225+
print("[POST-SHUTDOWN] ? All processes exited as expected")
226+
227+
def test_cleanup_temp_files(self, temp_dir, robot_controllers, urdf_file):
228+
"""Clean up temporary test files."""
229+
import shutil
230+
231+
print(f"\n[CLEANUP] Removing temporary directory: {temp_dir}")
232+
233+
# The original clean-up logic was commented out, enabling it for safety
234+
try:
235+
if os.path.exists(temp_dir):
236+
shutil.rmtree(temp_dir)
237+
238+
print("[CLEANUP] ? Temporary files removed")
239+
except Exception as e:
240+
print(f"[CLEANUP] Warning: Cleanup failed: {e}")

0 commit comments

Comments
 (0)