Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

support ros2 topic pub yaml file input #925

Merged
merged 4 commits into from
Feb 7, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
61 changes: 52 additions & 9 deletions ros2topic/ros2topic/verb/pub.py
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,10 @@ def add_arguments(self, parser, cli_name):
group.add_argument(
'--stdin', action='store_true',
help='Read values from standard input')
group.add_argument(
'--yaml-file', type=str, default=None,
help='YAML file that has message contents, '
'e.g STDOUT from ros2 topic echo <topic>')
parser.add_argument(
'-r', '--rate', metavar='N', type=positive_float, default=1.0,
help='Publishing rate in Hz (default: 1)')
Expand Down Expand Up @@ -131,6 +135,7 @@ def main(args):
args.message_type,
args.topic_name,
values,
args.yaml_file,
1. / args.rate,
args.print,
times,
Expand All @@ -146,6 +151,7 @@ def publisher(
message_type: MsgType,
topic_name: str,
values: dict,
yaml_file: str,
period: float,
print_nth: int,
times: int,
Expand All @@ -159,9 +165,14 @@ def publisher(
msg_module = get_message(message_type)
except (AttributeError, ModuleNotFoundError, ValueError):
raise RuntimeError('The passed message type is invalid')
values_dictionary = yaml.safe_load(values)
if not isinstance(values_dictionary, dict):
return 'The passed value needs to be a dictionary in YAML format'

msg_reader = None
if yaml_file:
msg_reader = read_msg_from_yaml(yaml_file)
else:
values_dictionary = yaml.safe_load(values)
if not isinstance(values_dictionary, dict):
return 'The passed value needs to be a dictionary in YAML format'

pub = node.create_publisher(msg_module, topic_name, qos_profile)

Expand All @@ -184,15 +195,38 @@ def publisher(
total_wait_time += DEFAULT_WAIT_TIME

msg = msg_module()
try:
timestamp_fields = set_message_fields(
msg, values_dictionary, expand_header_auto=True, expand_time_now=True)
except Exception as e:
return 'Failed to populate field: {0}'.format(e)
timestamp_fields = None

if not msg_reader:
# Set the static message from specified values once
try:
timestamp_fields = set_message_fields(
msg, values_dictionary, expand_header_auto=True, expand_time_now=True)
except Exception as e:
return 'Failed to populate field: {0}'.format(e)

print('publisher: beginning loop')
count = 0
more_message = True

def timer_callback():
if msg_reader:
# Try to read out the contents for each message
try:
one_msg = next(msg_reader)
if not isinstance(one_msg, dict):
print('The contents in YAML file need to be a YAML format')
except StopIteration:
nonlocal more_message
more_message = False
return
# Set the message with contents
try:
nonlocal timestamp_fields
timestamp_fields = set_message_fields(
msg, one_msg, expand_header_auto=True, expand_time_now=True)
except Exception as e:
return 'Failed to populate field: {0}'.format(e)
stamp_now = node.get_clock().now().to_msg()
for field_setter in timestamp_fields:
field_setter(stamp_now)
Expand All @@ -205,11 +239,20 @@ def timer_callback():
timer_callback()
if times != 1:
timer = node.create_timer(period, timer_callback)
while times == 0 or count < times:
while (times == 0 or count < times) and more_message:
rclpy.spin_once(node)
# give some time for the messages to reach the wire before exiting
time.sleep(keep_alive)
node.destroy_timer(timer)
else:
# give some time for the messages to reach the wire before exiting
time.sleep(keep_alive)


def read_msg_from_yaml(yaml_file):
with open(yaml_file, 'r') as f:
for document in yaml.load_all(f, Loader=yaml.FullLoader):
if document is None:
continue # Skip if there's no more document

yield document
10 changes: 10 additions & 0 deletions ros2topic/test/resources/chatter.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
---
data: 'Hello ROS Users'
---
---
data: Hello ROS Developers
---
data: Hello ROS Developers
---
---
data: 'Hello ROS Users'
39 changes: 39 additions & 0 deletions ros2topic/test/test_echo_pub.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
# limitations under the License.

import functools
import pathlib
import sys
import unittest

Expand Down Expand Up @@ -49,6 +50,8 @@
TEST_NODE = 'cli_echo_pub_test_node'
TEST_NAMESPACE = 'cli_echo_pub'

TEST_RESOURCES_DIR = pathlib.Path(__file__).resolve().parent / 'resources'


@pytest.mark.rostest
@launch_testing.markers.keep_alive
Expand Down Expand Up @@ -294,6 +297,42 @@ def test_pub_times(self, launch_service, proc_info, proc_output):
strict=True
)

@launch_testing.markers.retry_on_failure(times=5)
def test_pub_yaml(self, launch_service, proc_info, proc_output):
command_action = ExecuteProcess(
# yaml file prevails to the values 'data: hello'
cmd=(['ros2', 'topic', 'pub', '/clitest/topic/chatter',
'std_msgs/String', '--yaml-file',
str(TEST_RESOURCES_DIR / 'chatter.yaml')]),
additional_env={
'PYTHONUNBUFFERED': '1'
},
output='screen'
)
with launch_testing.tools.launch_process(
launch_service, command_action, proc_info, proc_output,
output_filter=launch_testing_ros.tools.basic_output_filter(
filtered_rmw_implementation=get_rmw_implementation_identifier()
)
) as command:
assert command.wait_for_shutdown(timeout=10)
assert command.exit_code == launch_testing.asserts.EXIT_OK
assert launch_testing.tools.expect_output(
expected_lines=[
'publisher: beginning loop',
"publishing #1: std_msgs.msg.String(data='Hello ROS Users')",
'',
"publishing #2: std_msgs.msg.String(data='Hello ROS Developers')",
'',
"publishing #3: std_msgs.msg.String(data='Hello ROS Developers')",
'',
"publishing #4: std_msgs.msg.String(data='Hello ROS Users')",
'',
],
text=command.output,
strict=True
)

@launch_testing.markers.retry_on_failure(times=5)
def test_echo_basic(self, launch_service, proc_info, proc_output):
params = [
Expand Down