Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Adds example tests for launch descriptions #165

Closed
wants to merge 2 commits into from
Closed
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 9 additions & 2 deletions launch_ros/examples/lifecycle_pub_sub_launch.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,8 +31,7 @@
import lifecycle_msgs.msg


def main(argv=sys.argv[1:]):
"""Main."""
def generate_launch_description() -> launch.LaunchDescription:
ld = launch.LaunchDescription()

# Prepare the talker node.
Expand Down Expand Up @@ -84,6 +83,14 @@ def main(argv=sys.argv[1:]):
ld.add_action(talker_node)
ld.add_action(emit_event_to_request_that_talker_does_configure_transition)

return ld


def main(argv=sys.argv[1:]):
"""Main."""

ld = generate_launch_description()

print('Starting introspection of launch description...')
print('')

Expand Down
23 changes: 16 additions & 7 deletions launch_ros/examples/pub_sub_launch.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,8 @@
sys.path.insert(0, os.path.join(os.path.dirname(__file__), '..')) # noqa
sys.path.insert(0, os.path.join(os.path.dirname(__file__), '..', '..', 'launch')) # noqa

import launch

from launch import LaunchDescription
from launch import LaunchIntrospector
from launch import LaunchService
Expand All @@ -27,17 +29,24 @@
import launch_ros.actions


def main(argv=sys.argv[1:]):
"""Main."""
ld = LaunchDescription([
def generate_launch_description() -> LaunchDescription:
return LaunchDescription([
launch_ros.actions.Node(
package='demo_nodes_cpp', node_executable='talker', output='screen',
remappings=[('chatter', 'my_chatter')]),
package='demo_nodes_cpp', node_executable='talker',
output='screen', remappings=[('chatter', 'my_chatter')]
),
launch_ros.actions.Node(
package='demo_nodes_cpp', node_executable='listener', output='screen',
remappings=[('chatter', 'my_chatter')]),
package='demo_nodes_cpp', node_executable='listener',
output='screen', remappings=[('chatter', 'my_chatter')]
),
])


def main(argv=sys.argv[1:]):
"""Main."""

ld = generate_launch_description()

print('Starting introspection of launch description...')
print('')

Expand Down
230 changes: 230 additions & 0 deletions launch_testing/examples/test_pub_sub_launch.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,230 @@
# Copyright 2019 Open Source Robotics Foundation, Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

"""
Some example tests around ROS 2 talker/listener nodes.

.. note::

To maximize code reuse, this test assumes that execution and executable
abstractions are decoupled (see https://github.com/ros2/launch/issues/114).
"""

import sys
import pytest

from launch import LaunchDescription
from launch.actions import Execute
from launch.events.process import ProcessStarted
from launch.launch_description_sources import \
get_launch_description_from_python_launch_file
from launch_ros import get_default_launch_description
from launch_ros.executables import Node
from launch_ros.executables import ComposableNode
from launch_ros.executables import ComposableNodeProcess
import launch_ros.events.lifecycle as lifecyle

from launch_testing import TestLaunchService
from launch_testing.actions import PyTest
from launch_testing.actions import GTest
from launch_testing.actions import Assert
from launch_testing.actions import AssertSequenceOnce

import launch_testing.predicates as predicates
import launch_testing.variables as variables


@pytest.fixture
def ros_test_launch_service() -> TestLaunchService:
"""
A test fixture providing a test-aware launch service (i.e. one
that listens to events fired by `launch_testing.actions.Assert`
and `launch_testing.actions.Test` actions and reacts accordingly).
"""
ls = TestLaunchService()
# TODO(hidmic): consider integrating ROS specific actions and the
# launch service in some way to ease launch file composition without
# repetition).
ls.include_launch_description(
get_default_launch_description(prefix_output_with_name=False)
)
return ls


@pytest.fixture
def pub_sub_launch_description() -> LaunchDescription:
"""
A test fixture providing the basic pub/sub launch description.
"""
# TODO(hidmic): implement functionality to lookup launch files
# on a per package basis
launch_path = get_launch_path(
package_name='launch_ros', launch_file_path='pub_sub_launch.py'
)
return get_launch_description_from_python_launch_file(launch_path)


def test_pub_sub_launch(ros_test_launch_service, pub_sub_launch_description):
"""
Tests an existing simple pub/sub system.
"""
# Launches a pytest that also happens to be a node with
# a 30s timeout.
pub_sub_launch_description.add_action(
# TODO(hidmic): allow launching non-installed
# executables within the current package.
PyTest(Node(
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think Node is the write argument for PyTest (or GTest below). The actual Python script containing tests or the executable containing a gtest main function are not necessarily nodes.

Copy link
Contributor Author

@hidmic hidmic Jan 18, 2019

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That's exactly the point I want to make. Both the pytest script and the gtest executable could be nodes, containers of composable nodes, plain ROS-agnostic programs, etc. So we'd want to reuse what we have in terms of doing file lookups, argument passing, output handling, etc. If you think about it, all the PyTest and GTest actions have to do is execute whatever is being passed in a single local process (for starters), pass specific command line arguments and collect whatever output the executable is expected to generate. All assuming that what we passed is what we say it is.

So above, where it says Node, it could say Process (i.e. any non ROS aware executable).

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So above, where it says Node, it could say Process (i.e. any non ROS aware executable).

Maybe I am missing how this is supposed to work.

A Python script being a node will have a __main__ which get the node running. On the other hand a Python file containing unit tests invoked by pytest doesn't. Also such a test file wouldn't be represented by Process (since it doesn't have a __main__). In fact a Python test file can't even be invoked - it commonly doesn't have an executable flag. So e.g. node_executable doesn't apply here - neither can you pass command line arguments of any kind.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Specifically for pytest, I think Dirk might be right. For gtest, I do think it makes sense to have an argument that inherits from the proposed Executable or ExecuteProcess (as Node does).

Copy link
Contributor Author

@hidmic hidmic Jan 21, 2019

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

A Python script being a node will have a main which get the node running. On the other hand a Python file containing unit tests invoked by pytest doesn't. Also such a test file wouldn't be represented by Process (since it doesn't have a main).

That's mostly true, though not strictly. The following script can be made executable to run all the tests in it:

#! /usr/bin/env python                                                          
                                                                                
import os                                                                       
import pytest                                                                   
                                                                                
                                                                                
def test_stuff():                                                               
    assert False                                                                
                                                                                
                                                                                
pytest.main([os.path.realpath(__file__)])

But, set aside above's unusual setup, you're right about pytest test files not being executable in general. And we do have a problem with command line arguments, though not an insurmountable one.


The initial idea was to describe and parameterize different aspects of the same executable through decoration. But pytest breaks the pattern in a way that I believe would require quite a bit of API refactoring in other parts of the launch system to cope with.

So, as I see it, this leaves us with few options (that I'm going to sketch for both gtest and pytest as I'd rather have a consistent API):

  1. We derive a test version of each executable kind we have and we think of testing frameworks as testing protocols (e.g. how to execute, how to collect results, etc.):
NodeTest(
     path='path/to/test.py', protocol='pytest', 
     timeout=20, remapping=[('chatter', 'my_chatter')]
 ),
ComposableNodeTest(
     container_executable='intra_proc_comm_test_container',
     composable_node_descriptions=[
         ComposableNode(
             package_name='composition',
             node_plugin_name='talker_component',
             name='my_talker_component'
         ),
         ComposableNode(
             package_name='composition',
             node_plugin_name='listener_component'
             name='my_listener_component'
         ),
      protocol='gtest', timeout=10
   ]
)
  1. We outright ban test parameterization at the launch level.
PyTest(path='path/to/test.py', timeout=10),
GTest(path='path/to/gtest', timeout=10)

(1) is somewhat complex, but it enables full launch API reuse. It's an alternative to specialization (i.e. inheritance) of each executable kind but with less code repetition. (2) is easy to use and to implement, though a bit limiting in my opinion.

I guess the question is whether we want to provide all the tools to parameterize tests or if we just to want to cover the basic use cases and then leave the users on their own to come up with custom ways of doing so.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

At the moment I am not aware of any use case where we need ComposableNodeTest. In the future there could certainly be test using that approach. For now I would rather focus on the cases which we do have a use case for.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Alright, went with the simple one.

node_executable='pub_test.py',
remappings=[('chatter', 'my_chatter')]
), timeout=30)
)
# Launches a gtest that also happens to be a node with
# a 30s timeout.
pub_sub_launch_description.add_action(
GTest(Node(
node_executable='sub_test',
remappings=[('chatter', 'my_chatter')]
), timeout=30)
)

ros_test_launch_service.include_launch_description(pub_sub_launch_description)

# TODO(hidmic): implement launch_testing specific pytest plugin to aggregate
# all test result information at the launch system-level and below.
assert ros_test_launch_service.run() == 0


@pytest.fixture
def lifecycle_pub_sub_launch_description() -> LaunchDescription:
"""
A test fixture providing the lifecycle pub/sub launch description.
"""
# TODO(hidmic): implement functionality to lookup launch files
# on a per package basis
launch_path = get_launch_path(
package_name='launch_ros',
launch_file_path='lifecycle_pub_sub_launch.py'
)
return get_launch_description_from_python_launch_file(launch_path)


def test_lifecycle_pub_sub_launch(ros_test_launch_service,
lifecycle_pub_sub_launch_description):
"""
Tests an existing lifecycle pub/sub nodes setup for execution
in proper sequence.
"""
# Asserts that a lifecycle.ChangeState event within a 30s window
# is followed by a process.ProcessStarted event within a 10s window
# once (40s total timeout).
lifecycle_pub_sub_launch_description.add_action(
AssertSequenceOnce([
variables.EventEmitted(
event_type=lifecycle.ChangeState
),
variables.EventEmitted(
event_type=ProcessStarted
)
], timeout=[30, 10])
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is interesting, and I like it, but the the generic name really needs a comment (as you've put here), otherwise I wouldn't have an idea of what's supposed to be doing. Not really a criticism but just an observation.

Also, would it be valuable to ensure that the events came from a specific process? (or node in the case of the lifecycle change) Like in the case of a multi-process test?

In which case, would it make sense to couple this assertion to the thing being tested? Either with the Node/ExecuteProcess being an argument to this action or with this being somehow an "peer" argument to another Test class, e.g. (Test(Node(), [<condition 1>, ....]) where this AssertSequenceOnce would be one of the conditions to test, related to this node). But then again, there might be "global" cases to test, or test cases which apply across multiple nodes, for example. Not sure there's anything to do, just thinking out loud.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is interesting, and I like it, but the the generic name really needs a comment

Sure. I'm open to any naming changes. How would you briefly state that you're asserting that N conditions occur in the right sequence/order?

Also, would it be valuable to ensure that the events came from a specific process?

For sure. Don't take these examples as a completely exhaustive demonstration of all the (future) API features. In this case, we could as easily add some matching to only check for the source (i.e. the executable) of interest.

In which case, would it make sense to couple this assertion to the thing being tested?

I did play with this idea. It wasn't clear to me either how to deal with multiple assertions of different kinds, potentially targeting multiple actions. That's why I dropped it in favor of using the target action instances directly in the predicates being asserted. We could have something similar to what you describe though, by deferring variable binding. E.g. instead of:

Assert(len(predicates.regex_match(
    variables.Output(target), pattern='^Oops!.*'
)) > 10, timeout=10)

we could write:

Test(target=Node(), description=[
    Assert(len(predicate.regex_match(
        variables.Output(), pattern='Oops!.*'
    )) > 10, timeout=10)
])

It may get somewhat confusing though if you start mixing both on the same launch file. The former is the most general one, so I don't think we can replace it with the latter.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sure. I'm open to any naming changes. How would you briefly state that you're asserting that N conditions occur in the right sequence/order?

I don't have an alternative in mind, but I'll keep thinking on it.

The rest makes sense to me, you can realize those cases when/if we need them.

)

ros_test_launch_service.include_launch_description(
lifecycle_pub_sub_launch_description
)
# TODO(hidmic): implement launch_testing specific pytest plugin to
# aggregate all test result information at the launch system-level
# and below.
assert ros_test_launch_service.run() == 0


def test_pub_sub(ros_test_launch_service):
"""
Tests a basic pub/sub setup for proper output matching.
"""
ld = launch.LaunchDescription()
talker_node_action = Execute(
launch_ros.actions.Node(
package='demo_nodes_cpp',
node_executable='talker',
remappings=[('chatter', 'my_chatter')]
), output='screen'
)
ld.add_action(talker_node_action)
listener_node_action = Execute(
launch_ros.actions.Node(
package='demo_nodes_py',
node_executable='listener',
remappings=[('chatter', 'my_chatter')]
), output='screen'
)
ld.add_action(listener_node_action)
# Asserts that not a single talker screen output
# line contains the ERROR string throughout the duration
# of the test (i.e. it completes when all other tests
# have completed). The assertion is strict (i.e. failure
# on a single output event means failure altogether).
ld.add_action(
Assert(not predicates.regex_match(
variables.Output(talker_node_action),
pattern='.*ERROR.*', flags=None
), timeout=None, strict=True)
)
# Asserts that a the listener outputs 'I heard...'
# at least once in the first 10s of the test. The
# assertion is not strict (i.e. failure on a single
# output event does not mean failure altogether).
ld.add_action(
Assert(predicates.count(predicates.regex_match(
variables.Output(listener_node_action),
pattern='I heard.*', flags=None
)) > 0, message='No message received!', timeout=10)
)
ros_test_launch_service.include_launch_description(ld)

# TODO(hidmic): implement launch_testing specific pytest plugin to
# aggregate all test result information at the launch system-level
# and below.
assert ros_test_launch_service.run() == 0


def test_intra_comm_pub_sub(ros_test_launch_service):
"""
Tests a intra process communication between composable
pub/sub nodes via gtest.
"""
# Launches a gtest that happens to be a container
# for composable nodes.
ld = LaunchDescription([
GTest(ComposableNodeProcess(
dirk-thomas marked this conversation as resolved.
Show resolved Hide resolved
container_executable='intra_proc_comm_test_container',
composable_node_descriptions=[
ComposableNode(
package_name='composition',
node_plugin_name='talker_component',
name='my_talker_component'
),
ComposableNode(
package_name='composition',
node_plugin_name='listener_component'
name='my_listener_component'
),
]
), timeout=40),
])
ros_test_launch_service.include_launch_description(ld)

# TODO(hidmic): implement launch_testing specific pytest plugin to
# aggregate all test result information at the launch system-level
# and below.
assert ros_test_launch_service.run() == 0
90 changes: 90 additions & 0 deletions launch_testing/examples/test_system_launch.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,90 @@
# Copyright 2019 Open Source Robotics Foundation, Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

import sys
import pytest

from datetime import timedelta

from launch import LaunchDescription
from launch.actions import Execute
from launch.actions import RegisterEventHandler
from launch.executables import Process
from launch.event_handlers import OnProcessExited
from launch_testing import TestLaunchService
from launch_testing.actions import AssertOnce

import launch_testing.predicates as predicates
import launch_testing.variables as variables


@pytest.fixture
def launch_service() -> TestLaunchService:
"""
A test fixture providing a test-aware launch service (i.e. one
that listens to events fired by `launch_testing.actions.Assert`
and `launch_testing.actions.Test` actions and reacts accordingly).
"""
return TestLaunchService()


def test_file_compression(launch_service):
ld = LaunchDescription()

prelist_bags_action = Execute(Process(
cmd='ls *.bag', cwd='/var/log/bags', shell=True
), output='screen')
ld.add_action(prelist_bags_action)

compression_action = Execute(Process(
cmd='bzip2 /var/log/bags/*.bag', shell=True
), output='screen', prefix='time')
ld.add_action(compression_action)

@predicates.custom
def parse_timedelta(string_value):
match = re.match(
'(?P<minutes>\d+)m(?P<seconds>\d\.\d{3})', string_value
)
return timedelta(**{
name: float(value) if value else 0
for name, value in match.groupdict()
})

ld.add_action(
AssertOnce(
parse_timedelta(predicates.regex_match(
variables.Output(compression_action),
pattern='^real (\d+m\d\.\d{3}s)$', group=1
)) < timedelta(minutes=1, seconds=30) and
parse_timedelta(predicates.regex_match(
variables.Output(compression_action),
pattern='^sys (\d+m\d\.\d{3}s)$', group=1
)) < timedelta(seconds=30), timeout=120
)
)

postlist_bags_action = Execute(Process(
cmd='ls *.bag.bz2', cwd='/var/log/bags', shell=True
), output='screen')
assert_all_bags_compressed = AssertOnce(
predicates.count(variables.Output(postlist_bags_action))
== predicates.count(variables.Output(prelist_bags_action))
)
ld.add_action(RegisterEventHandler(OnProcessExit(
target_action=compression_action,
on_exit=[postlist_bags_action, assert_all_bags_compressed]
)))

assert launch_service.run() == 0