Skip to content

Conversation

@robkwan
Copy link

@robkwan robkwan commented Nov 2, 2025

Contributions via pull requests are much appreciated. Before sending us a pull request, please ensure that:

  1. Limited scope. Your PR should do one thing or one set of things. Avoid adding “random fixes” to PRs. Put those on separate PRs.
  2. Give your PR a descriptive title. Add a short summary, if required.
  3. Make sure the pipeline is green.
  4. Don’t be afraid to request reviews from maintainers.
  5. New code = new tests. If you are adding new functionality, always make sure to add some tests exercising the code and serving as live documentation of your original intention.

To send us a pull request, please:

  • Fork the repository.
  • Modify the source; please focus on the specific change you are contributing. If you also reformat all the code, it will be hard for us to focus on your change.
  • Ensure local tests pass. (colcon test and pre-commit run (requires you to install pre-commit by pip3 install pre-commit)
  • Commit to your fork using clear commit messages.
  • Send a pull request, answering any default questions in the pull request interface.
  • Pay attention to any automated CI failures reported in the pull request, and stay involved in the conversation.

Copy link
Member

@christophfroehlich christophfroehlich left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The tests you added are some sort of unit tests, checking if the returned objects are of expected type/size. But it does not check if the returned actions work as expected (let's call this integration test).

Can you please extract the new tests from this file (new tests are not testing ros2_control_node at all, and don't need launch_ros or launch_testing) in a launch_utils_unit_test file, and add distinct files for the integration tests of the three methods of the launch_utils? You could also extend the existing test_ros2_control_node_launch.py by adding more controllers, and spawn different combinations with the different methods at once.

Please also install pre-commit and fix the failing tests by running pre-commit run --all

@codecov
Copy link

codecov bot commented Nov 2, 2025

Codecov Report

❌ Patch coverage is 88.23529% with 4 lines in your changes missing coverage. Please review.
✅ Project coverage is 89.65%. Comparing base (a7e0b3a) to head (05e3d32).

Files with missing lines Patch % Lines
...ller_manager/test/test_ros2_control_node_launch.py 88.23% 2 Missing and 2 partials ⚠️
Additional details and impacted files
@@            Coverage Diff             @@
##           master    #2768      +/-   ##
==========================================
+ Coverage   89.62%   89.65%   +0.03%     
==========================================
  Files         152      152              
  Lines       17637    17670      +33     
  Branches     1448     1451       +3     
==========================================
+ Hits        15807    15842      +35     
+ Misses       1246     1241       -5     
- Partials      584      587       +3     
Flag Coverage Δ
unittests 89.65% <88.23%> (+0.03%) ⬆️

Flags with carried forward coverage won't be shown. Click here to find out more.

Files with missing lines Coverage Δ
...ller_manager/test/test_ros2_control_node_launch.py 94.66% <88.23%> (-5.34%) ⬇️

... and 2 files with indirect coverage changes

🚀 New features to boost your workflow:
  • ❄️ Test Analytics: Detect flaky tests, report on failures, and find test suite problems.

@robkwan
Copy link
Author

robkwan commented Nov 3, 2025

Thanks for the comment, @christophfroehlich! Let me try again then...

@robkwan
Copy link
Author

robkwan commented Nov 9, 2025

Hi @christophfroehlich, I hope that it will work ok this time please. I first did the pre-commit ok in my jazzy branch but once I merged back to the master branch, I forgot to redo the pre-commit and so I had another commit to fix that again! Thanks!

Copy link
Member

@christophfroehlich christophfroehlich left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the work.
Without detailed review, I have two requests:

  • We have so many files already in the test folder. Please move the new files in a subfolder of test (call it test_launch_utils or similar), maybe with a dedicated cmake file, which is just included from the main one via add_subdirectory(test/test_launch_utils).
  • Follow the best practices for launch files, see #2767.

robkwan added a commit to robkwan/ros2_control that referenced this pull request Nov 16, 2025
@robkwan
Copy link
Author

robkwan commented Nov 16, 2025

Hi @christophfroehlich, thanks for the feedback. And I just tried to make the changes and committed again [ef44894]. Hope that it will be ok this time. By the way, I will be out of town from next weekend and so I won't be able to work on it till I am back early next year. So I hope that I can help to close this before I depart then. Thanks!

Copy link
Member

@christophfroehlich christophfroehlich left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I reviewed the first python file, if you agree please apply the changes also to the others.

Comment on lines 248 to 249
find_package(launch_testing_ament_cmake REQUIRED)
find_package(rclpy REQUIRED)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
find_package(launch_testing_ament_cmake REQUIRED)
find_package(rclpy REQUIRED)

As this is also included in the cmake file in the subdir

Comment on lines 31 to 42
ament_add_pytest_test(test_launch_utils_integration_list
test_launch_utils_integration_list.py
TIMEOUT 60
)
ament_add_pytest_test(test_launch_utils_integration_dict
test_launch_utils_integration_dict.py
TIMEOUT 60
)
ament_add_pytest_test(test_launch_utils_integration_load
test_launch_utils_integration_load.py
TIMEOUT 60
)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
ament_add_pytest_test(test_launch_utils_integration_list
test_launch_utils_integration_list.py
TIMEOUT 60
)
ament_add_pytest_test(test_launch_utils_integration_dict
test_launch_utils_integration_dict.py
TIMEOUT 60
)
ament_add_pytest_test(test_launch_utils_integration_load
test_launch_utils_integration_load.py
TIMEOUT 60
)
ament_add_pytest_test(test_launch_utils_integration_list
test_launch_utils_integration_list.py
)
ament_add_pytest_test(test_launch_utils_integration_dict
test_launch_utils_integration_dict.py
)
ament_add_pytest_test(test_launch_utils_integration_load
test_launch_utils_integration_load.py
)

nitpick: Isn't 60 the default value?


print("[POST-SHUTDOWN] ? All processes exited as expected")

def test_cleanup_temp_files(self, temp_dir):
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can you explain why this is necessary? We don't do any cleanups in other pytests.

Comment on lines 133 to 185
def test_controllers_loaded(self, proc_info, controller_list):
"""Test that controllers were loaded (poll until they appear)."""
node = rclpy.create_node("test_controller_query_node")

try:
from controller_manager_msgs.srv import ListControllers

client = node.create_client(ListControllers, "/controller_manager/list_controllers")

print("\n[TEST] Waiting for controller_manager service...")
wait_for_svc_timeout = 30.0
if not client.wait_for_service(timeout_sec=wait_for_svc_timeout):
process_names = proc_info.process_names()
self.fail(
f"Controller manager service not available after {wait_for_svc_timeout}s.\n"
f"Active processes: {process_names}"
)

# Poll for controllers to be registered
print("[TEST] Service available, polling for controllers (timeout 30s)...")
deadline = node.get_clock().now() + rclpy.duration.Duration(seconds=30.0)
seen = []
while node.get_clock().now() < deadline:
req = ListControllers.Request()
fut = client.call_async(req)
rclpy.spin_until_future_complete(node, fut, timeout_sec=2.0)
if fut.done() and fut.result() is not None:
response = fut.result()
seen = [c.name for c in response.controller]
if all(ctrl in seen for ctrl in controller_list):
print(f"[TEST] Loaded controllers: {seen}")
break
# small sleep to avoid tight-loop
time.sleep(0.2)
else:
# timeout expired
self.fail(
f"Timeout waiting for controllers to be loaded. "
f"Expected: {controller_list}, saw: {seen}"
)

# Final assert (defensive)
for controller in controller_list:
self.assertIn(
controller,
seen,
f"Controller '{controller}' was not loaded. Available: {seen}",
)

print(f"[TEST] ? All {len(controller_list)} controllers loaded successfully")

finally:
node.destroy_node()
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why not using this?

Suggested change
def test_controllers_loaded(self, proc_info, controller_list):
"""Test that controllers were loaded (poll until they appear)."""
node = rclpy.create_node("test_controller_query_node")
try:
from controller_manager_msgs.srv import ListControllers
client = node.create_client(ListControllers, "/controller_manager/list_controllers")
print("\n[TEST] Waiting for controller_manager service...")
wait_for_svc_timeout = 30.0
if not client.wait_for_service(timeout_sec=wait_for_svc_timeout):
process_names = proc_info.process_names()
self.fail(
f"Controller manager service not available after {wait_for_svc_timeout}s.\n"
f"Active processes: {process_names}"
)
# Poll for controllers to be registered
print("[TEST] Service available, polling for controllers (timeout 30s)...")
deadline = node.get_clock().now() + rclpy.duration.Duration(seconds=30.0)
seen = []
while node.get_clock().now() < deadline:
req = ListControllers.Request()
fut = client.call_async(req)
rclpy.spin_until_future_complete(node, fut, timeout_sec=2.0)
if fut.done() and fut.result() is not None:
response = fut.result()
seen = [c.name for c in response.controller]
if all(ctrl in seen for ctrl in controller_list):
print(f"[TEST] Loaded controllers: {seen}")
break
# small sleep to avoid tight-loop
time.sleep(0.2)
else:
# timeout expired
self.fail(
f"Timeout waiting for controllers to be loaded. "
f"Expected: {controller_list}, saw: {seen}"
)
# Final assert (defensive)
for controller in controller_list:
self.assertIn(
controller,
seen,
f"Controller '{controller}' was not loaded. Available: {seen}",
)
print(f"[TEST] ? All {len(controller_list)} controllers loaded successfully")
finally:
node.destroy_node()
def test_controllers_start(self):
cnames = ["ctrl_with_parameters_and_type"]
check_controllers_running(self.node, cnames, state="active")

Needs
from controller_manager.test_utils import check_controllers_running

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If you see an improvement to check_controllers_running, please open a PR for it! :)

Comment on lines 204 to 228
print("\n[POST-SHUTDOWN] Process exit codes:")
for process_name in proc_info.process_names():
info = proc_info[process_name]
print(f" {process_name}: {info.returncode}")

for process_name in proc_info.process_names():
info = proc_info[process_name]

if "ros2_control_node" in process_name:
self.assertEqual(
info.returncode, 0, f"{process_name} exited with {info.returncode}"
)
elif "spawner" in process_name:
# Spawner should complete successfully (0) or be terminated
self.assertIn(
info.returncode,
[0, -2, -15],
f"Spawner {process_name} exited with {info.returncode}",
)
else:
self.assertIn(
info.returncode, [0, -2, -15], f"{process_name} exited with {info.returncode}"
)

print("[POST-SHUTDOWN] ? All processes exited as expected")
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this verbose test case necessary? Wouldn't this one be also fine?

Suggested change
print("\n[POST-SHUTDOWN] Process exit codes:")
for process_name in proc_info.process_names():
info = proc_info[process_name]
print(f" {process_name}: {info.returncode}")
for process_name in proc_info.process_names():
info = proc_info[process_name]
if "ros2_control_node" in process_name:
self.assertEqual(
info.returncode, 0, f"{process_name} exited with {info.returncode}"
)
elif "spawner" in process_name:
# Spawner should complete successfully (0) or be terminated
self.assertIn(
info.returncode,
[0, -2, -15],
f"Spawner {process_name} exited with {info.returncode}",
)
else:
self.assertIn(
info.returncode, [0, -2, -15], f"{process_name} exited with {info.returncode}"
)
print("[POST-SHUTDOWN] ? All processes exited as expected")
launch_testing.asserts.assertExitCodes(proc_info)

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Are these changes intentional?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is just the clean-up the old changes I made but they should be moved to the subfolder /test_launch_utils now. Thanks.


# URDF path (pathlib version, no xacro)
urdf = (
Path(get_package_share_directory("ros2_control_test_assets"))
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
Path(get_package_share_directory("ros2_control_test_assets"))
PathSubstitution(FindPackageShare("ros2_control_test_assets"))

From

from launch.substitutions import PathSubstitution
from launch_ros.substitutions import FindPackageShare

would be the ROS-ish way.


# Path to combined YAML
robot_controllers = (
Path(get_package_prefix("controller_manager"))
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
Path(get_package_prefix("controller_manager"))
PathSubstitution(FindPackagePrefix("controller_manager"))

from

from launch_ros.substitutions import FindPackagePrefix

Comment on lines 69 to 107
robot_state_pub_node = launch_ros.actions.Node(
package="robot_state_publisher",
executable="robot_state_publisher",
output="both",
parameters=[robot_description],
)

# ===== START CONTROLLER MANAGER (ros2_control_node) =====
control_node = launch_ros.actions.Node(
package="controller_manager",
executable="ros2_control_node",
parameters=[str(robot_controllers)], # Use the combined config file
output="both",
)

# The dictionary keys are the controller names to be spawned/started.
# Values can be empty lists since config is provided via the main YAML.
ctrl_dict = {
"joint_state_broadcaster": [str(robot_controllers)],
"controller1": [str(robot_controllers)],
"controller2": [str(robot_controllers)],
}
controller_list = list(ctrl_dict.keys())

# ===== GENERATE SPAWNER LAUNCH DESCRIPTION =====
print(f"Spawning controllers: {controller_list}")

# Correct function name and call
spawner_ld = generate_controllers_spawner_launch_description_from_dict(
controller_info_dict=ctrl_dict,
)

# ===== CREATE LAUNCH DESCRIPTION =====
ld = LaunchDescription(
[robot_state_pub_node, control_node, ReadyToTest()] + spawner_ld.entities
)

# Return tuple with launch description and test context
return ld, {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
robot_state_pub_node = launch_ros.actions.Node(
package="robot_state_publisher",
executable="robot_state_publisher",
output="both",
parameters=[robot_description],
)
# ===== START CONTROLLER MANAGER (ros2_control_node) =====
control_node = launch_ros.actions.Node(
package="controller_manager",
executable="ros2_control_node",
parameters=[str(robot_controllers)], # Use the combined config file
output="both",
)
# The dictionary keys are the controller names to be spawned/started.
# Values can be empty lists since config is provided via the main YAML.
ctrl_dict = {
"joint_state_broadcaster": [str(robot_controllers)],
"controller1": [str(robot_controllers)],
"controller2": [str(robot_controllers)],
}
controller_list = list(ctrl_dict.keys())
# ===== GENERATE SPAWNER LAUNCH DESCRIPTION =====
print(f"Spawning controllers: {controller_list}")
# Correct function name and call
spawner_ld = generate_controllers_spawner_launch_description_from_dict(
controller_info_dict=ctrl_dict,
)
# ===== CREATE LAUNCH DESCRIPTION =====
ld = LaunchDescription(
[robot_state_pub_node, control_node, ReadyToTest()] + spawner_ld.entities
)
# Return tuple with launch description and test context
return ld, {
# The dictionary keys are the controller names to be spawned/started.
# Values can be empty lists since config is provided via the main YAML.
ctrl_dict = {
"joint_state_broadcaster": [str(robot_controllers)],
"controller1": [str(robot_controllers)],
"controller2": [str(robot_controllers)],
}
controller_list = list(ctrl_dict.keys())
# ===== GENERATE SPAWNER LAUNCH DESCRIPTION =====
print(f"Spawning controllers: {controller_list}")
# Return tuple with launch description and test context
return LaunchDescription([
launch_ros.actions.Node(
package="robot_state_publisher",
executable="robot_state_publisher",
output="both",
parameters=[robot_description],
),
launch_ros.actions.Node(
package="controller_manager",
executable="ros2_control_node",
parameters=[str(robot_controllers)], # Use the combined config file
output="both",
),
generate_controllers_spawner_launch_description_from_dict(
controller_info_dict=ctrl_dict,
),
ReadyToTest()]), {

Do we need the variables, can't we just directly return them?

Comment on lines 107 to 112
return ld, {
"controller_list": controller_list, # Key name updated to match the test function
"robot_controllers": robot_controllers,
"urdf_file": urdf,
"temp_dir": temp_dir,
}
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
return ld, {
"controller_list": controller_list, # Key name updated to match the test function
"robot_controllers": robot_controllers,
"urdf_file": urdf,
"temp_dir": temp_dir,
}
return ld

Why returning this? (I'm not very familiar with pytest, just following the ROS integration tests tutorials).

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks @christophfroehlich for the detailed review. I will try to look into it in the next two days.

Noel215 and others added 4 commits November 21, 2025 02:16
@robkwan
Copy link
Author

robkwan commented Nov 21, 2025

Hi @christophfroehlich, after changing to use the PathSubstitution approach, I found that there're more changes involved or it would fail for the difference between path and string as I could see from the colcon test-result --verbose. Therefore, I hope that it is ok this time. If more change is needed and it is not urgent, I can resume after I come back from my trip in early Jan. Thanks.

Copy link
Member

@christophfroehlich christophfroehlich left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There is some issue with the git history here. I suggest that you

  • interactive rebase from 27e3b68
  • drop all commits which aren't yours
  • merge origin/master into your fork
  • force push

@mergify
Copy link
Contributor

mergify bot commented Nov 23, 2025

This pull request is in conflict. Could you fix it @robkwan?

@robkwan
Copy link
Author

robkwan commented Nov 24, 2025

Hi @christophfroehlich , thanks for the comment! I will try to do as suggested after I will be back home after new year then...Thanks.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

6 participants