Skip to content

Commit

Permalink
Migrate test_rclcpp tests to new launch_testing API.
Browse files Browse the repository at this point in the history
Signed-off-by: Michel Hidalgo <michel@ekumenlabs.com>
  • Loading branch information
hidmic committed Mar 27, 2019
1 parent da8d032 commit f23ba1d
Show file tree
Hide file tree
Showing 5 changed files with 111 additions and 85 deletions.
11 changes: 7 additions & 4 deletions test_rclcpp/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ if(BUILD_TESTING)
find_package(ament_lint_auto REQUIRED)
ament_lint_auto_find_test_dependencies()

find_package(ament_cmake_pytest REQUIRED)
find_package(launch_testing_ament_cmake REQUIRED)

set(message_files
"msg/UInt32.msg"
Expand Down Expand Up @@ -96,8 +96,9 @@ if(BUILD_TESTING)
OUTPUT "${CMAKE_CURRENT_BINARY_DIR}/${test_name}${target_suffix}_$<CONFIG>.py"
INPUT "${CMAKE_CURRENT_BINARY_DIR}/${test_name}${target_suffix}.py.configure"
)
ament_add_pytest_test(${test_name}${target_suffix}
add_launch_test(
"${CMAKE_CURRENT_BINARY_DIR}/${test_name}${target_suffix}_$<CONFIG>.py"
TARGET "${test_name}${target_suffix}"
APPEND_LIBRARY_DIRS "${append_library_dirs}"
${_ARG_UNPARSED_ARGUMENTS}
)
Expand All @@ -122,8 +123,9 @@ if(BUILD_TESTING)
OUTPUT "${CMAKE_CURRENT_BINARY_DIR}/test_n_nodes${target_suffix}_$<CONFIG>.py"
INPUT "${CMAKE_CURRENT_BINARY_DIR}/test_n_nodes${target_suffix}.py.configure"
)
ament_add_pytest_test(test_n_nodes${target_suffix}
add_launch_test(
"${CMAKE_CURRENT_BINARY_DIR}/test_n_nodes${target_suffix}_$<CONFIG>.py"
TARGET test_n_nodes${target_suffix}
APPEND_LIBRARY_DIRS "${append_library_dirs}"
${_ARG_UNPARSED_ARGUMENTS}
)
Expand Down Expand Up @@ -154,8 +156,9 @@ if(BUILD_TESTING)
OUTPUT "${CMAKE_CURRENT_BINARY_DIR}/${test_name}${target_suffix}_$<CONFIG>.py"
INPUT "${CMAKE_CURRENT_BINARY_DIR}/${test_name}${target_suffix}.py.configure"
)
ament_add_pytest_test(${test_name}${target_suffix}
add_launch_test(
"${CMAKE_CURRENT_BINARY_DIR}/${test_name}${target_suffix}_$<CONFIG>.py"
TARGET ${test_name}${target_suffix}
${ARGN}
APPEND_LIBRARY_DIRS "${append_library_dirs}"
ENV
Expand Down
2 changes: 2 additions & 0 deletions test_rclcpp/package.xml
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@
<test_depend>ament_lint_common</test_depend>
<test_depend>launch</test_depend>
<test_depend>launch_testing</test_depend>
<test_depend>launch_testing_ament_cmake</test_depend>
<test_depend>launch_testing_ros</test_depend>
<test_depend>rclcpp</test_depend>
<test_depend>rmw_implementation</test_depend>
<test_depend>rmw_implementation_cmake</test_depend>
Expand Down
80 changes: 40 additions & 40 deletions test_rclcpp/test/test_executable_output.py.in
Original file line number Diff line number Diff line change
Expand Up @@ -2,54 +2,54 @@
# generated code does not contain a copyright notice

from launch import LaunchDescription
from launch import LaunchService
from launch.actions import ExecuteProcess
from launch_testing.legacy import LaunchTestService
from launch_testing.legacy.output import create_output_lines_filter
from launch_testing.legacy.output import create_output_test_from_file
from launch.actions import OpaqueFunction

import launch_testing
import launch_testing.asserts
import launch_testing_ros

def @TEST_NAME@():
launch_test = LaunchTestService()
launch_description = LaunchDescription()
import unittest

action = launch_test.add_fixture_action(
launch_description, ExecuteProcess(
cmd=['@TEST_EXECUTABLE@'],
name='@TEST_EXECUTABLE_NAME@',
sigterm_timeout='15',
output='screen'
), exit_allowed=True
)

launch_test.add_output_test(
launch_description, action,
output_test=create_output_test_from_file(
output_file='@TEST_EXECUTABLE_EXPECTED_OUTPUT@'
),
output_filter=create_output_lines_filter(
filtered_rmw_implementation='@rmw_implementation@'
)
def generate_test_description(ready_fn):
launch_description = LaunchDescription()
proc_under_test = ExecuteProcess(
cmd=['@TEST_EXECUTABLE@'],
name='@TEST_EXECUTABLE_NAME@',
sigterm_timeout='15',
output='screen'
)

launch_test.add_output_test(
launch_description, action,
output_test=create_output_test_from_file(
output_file='@TEST_EXECUTABLE_TRIGGER_SHUTDOWN_OUTPUT@'
),
output_filter=create_output_lines_filter(
filtered_rmw_implementation='@rmw_implementation@'
),
test_suffix='trigger_shutdown_output',
side_effect='shutdown'
launch_description.add_action(proc_under_test)
launch_description.add_action(
OpaqueFunction(function=lambda context: ready_fn())
)
return launch_description, locals()

launch_service = LaunchService()
launch_service.include_launch_description(launch_description)
rc = launch_test.run(launch_service)

assert rc == 0, "The launch file failed with exit code '" + str(rc) + "'. "
class TestExecutableOutput(unittest.TestCase):

def @TEST_NAME@(self, proc_under_test):
output_filter = launch_testing_ros.tools.basic_output_filter(
filtered_rmw_implementation='@rmw_implementation@'
)
self.proc_output.assertWaitFor(
expected_output=launch_testing.tools.expected_output_from_file(
path='@TEST_EXECUTABLE_TRIGGER_SHUTDOWN_OUTPUT@'
), process=proc_under_test, output_filter=output_filter, timeout=10
)


@launch_testing.post_shutdown_test()
class TestExecutableOutputAfterShutdown(unittest.TestCase):

if __name__ == '__main__':
@TEST_NAME@()
def @TEST_NAME@(self, proc_under_test):
output_filter = launch_testing_ros.tools.basic_output_filter(
filtered_rmw_implementation='@rmw_implementation@'
)
launch_testing.asserts.assertInStdout(
self.proc_output,
expected_output=launch_testing.tools.expected_output_from_file(
path='@TEST_EXECUTABLE_EXPECTED_OUTPUT@'
), process=proc_under_test, output_filter=output_filter
)
47 changes: 28 additions & 19 deletions test_rclcpp/test/test_n_nodes.py.in
Original file line number Diff line number Diff line change
Expand Up @@ -3,13 +3,17 @@
import os

from launch import LaunchDescription
from launch import LaunchService
from launch.actions import ExecuteProcess
from launch_testing.legacy import LaunchTestService
from launch.actions import OpaqueFunction

import launch_testing
import launch_testing.asserts
import launch_testing_ros

def test_@TEST_NUM_NODES@_nodes():
launch_test = LaunchTestService()
import unittest


def generate_test_description(ready_fn):
launch_description = LaunchDescription()

env = None
Expand All @@ -18,29 +22,34 @@ def test_@TEST_NUM_NODES@_nodes():
env['RMW_IMPLEMENTATION'] = '@TEST_RMW_IMPLEMENTATION@'

for i in range(@TEST_NUM_NODES@):
launch_test.add_fixture_action(
launch_description, ExecuteProcess(
launch_description.add_action(
ExecuteProcess(
cmd=['@TEST_EXECUTABLE1@'],
name='node_with_name_' + str(i),
env=env,
)
)

cmd = ['@TEST_EXECUTABLE2@'] + '@TEST_NUM_NODES@'.split(' ')
launch_test.add_test_action(
launch_description, ExecuteProcess(
cmd=cmd,
name='node_check_names',
env=env
)
checking_process = ExecuteProcess(
cmd=['@TEST_EXECUTABLE2@'] + '@TEST_NUM_NODES@'.split(' '),
name='node_check_names',
env=env
)
launch_description.add_action(checking_process)
launch_description.add_action(
OpaqueFunction(function=lambda context: ready_fn())
)
return launch_description, locals()


class TestNNodes(unittest.TestCase):

launch_service = LaunchService()
launch_service.include_launch_description(launch_description)
rc = launch_test.run(launch_service)
def test_@TEST_NUM_NODES@_nodes(self, checking_process):
self.proc_info.assertWaitForShutdown(checking_process, timeout=10)

assert rc == 0, "The launch file failed with exit code '" + str(rc) + "'. "

@launch_testing.post_shutdown_test()
class TestNNodesAfterShutdown(unittest.TestCase):

if __name__ == '__main__':
test_@TEST_NUM_NODES@_nodes()
def test_@TEST_NUM_NODES@_nodes(self):
launch_testing.asserts.assertExitCodes(self.proc_info)
56 changes: 34 additions & 22 deletions test_rclcpp/test/test_two_executables.py.in
Original file line number Diff line number Diff line change
Expand Up @@ -3,13 +3,16 @@
import os

from launch import LaunchDescription
from launch import LaunchService
from launch.actions import ExecuteProcess
from launch_testing.legacy import LaunchTestService
from launch.actions import OpaqueFunction

import launch_testing
import launch_testing.asserts

def @TEST_NAME@():
launch_test = LaunchTestService()
import unittest


def generate_test_description(ready_fn):
launch_description = LaunchDescription()

cmd = ['@TEST_EXECUTABLE1@']
Expand All @@ -19,13 +22,12 @@ def @TEST_NAME@():
if '@TEST_RMW_IMPLEMENTATION1@':
env = dict(os.environ)
env['RMW_IMPLEMENTATION'] = '@TEST_RMW_IMPLEMENTATION1@'
launch_test.add_fixture_action(
launch_description, ExecuteProcess(
cmd=cmd,
name='@TEST_EXECUTABLE1_NAME@',
env=env,
)
executable_1 = ExecuteProcess(
cmd=cmd,
name='@TEST_EXECUTABLE1_NAME@',
env=env,
)
launch_description.add_action(executable_1)

cmd = ['@TEST_EXECUTABLE2@']
if '@TEST_EXECUTABLE2_ARGS@':
Expand All @@ -34,20 +36,30 @@ def @TEST_NAME@():
if '@TEST_RMW_IMPLEMENTATION2@':
env = dict(os.environ)
env['RMW_IMPLEMENTATION'] = '@TEST_RMW_IMPLEMENTATION2@'
launch_test.add_test_action(
launch_description, ExecuteProcess(
cmd=cmd,
name='@TEST_EXECUTABLE2_NAME@',
env=env,
)
executable_2 = ExecuteProcess(
cmd=cmd,
name='@TEST_EXECUTABLE2_NAME@',
env=env,
)

launch_service = LaunchService()
launch_service.include_launch_description(launch_description)
rc = launch_test.run(launch_service)
launch_description.add_action(executable_2)

launch_description.add_action(
OpaqueFunction(function=lambda context: ready_fn())
)
return launch_description, locals()


class TestTwoExecutables(unittest.TestCase):

def @TEST_NAME@(self, executable_2):
"""Test that the second executable terminates after a finite amount of time."""
self.proc_info.assertWaitForShutdown(process=executable_2, timeout=10)

assert rc == 0, "The launch file failed with exit code '" + str(rc) + "'. "

@launch_testing.post_shutdown_test()
class TestTwoExecutablesAfterShutdown(unittest.TestCase):

if __name__ == '__main__':
@TEST_NAME@()
def @TEST_NAME@(self):
"""Test that both executables finished cleanly."""
launch_testing.asserts.assertExitCodes(self.proc_info)

0 comments on commit f23ba1d

Please sign in to comment.