From 0821f139179500682661705c79bf327a92e459ef Mon Sep 17 00:00:00 2001 From: Ivan Santiago Paunovic Date: Thu, 27 May 2021 12:44:52 -0300 Subject: [PATCH 1/6] Add option in ros2 topic pub to wait for N matching subscriptions, use N=1 by default Signed-off-by: Ivan Santiago Paunovic --- ros2topic/ros2topic/verb/pub.py | 15 +++++++++++++-- 1 file changed, 13 insertions(+), 2 deletions(-) diff --git a/ros2topic/ros2topic/verb/pub.py b/ros2topic/ros2topic/verb/pub.py index 346c9d0c9..ddbaab29a 100644 --- a/ros2topic/ros2topic/verb/pub.py +++ b/ros2topic/ros2topic/verb/pub.py @@ -80,6 +80,9 @@ def add_arguments(self, parser, cli_name): group.add_argument( '-1', '--once', action='store_true', help='Publish one message and exit') + group.add_argument( + '-m', '--wait-matching-subscriptions', type=nonnegative_int, default=1, + help='Wait until finding the specified number of matching subscriptions') group.add_argument( '-t', '--times', type=nonnegative_int, default=0, help='Publish this number of times and then exit') @@ -146,6 +149,7 @@ def main(args): 1. / args.rate, args.print, times, + args.wait_matching_subscriptions, qos_profile, args.keep_alive) @@ -158,6 +162,7 @@ def publisher( period: float, print_nth: int, times: int, + wait_matching_subscriptions: int, qos_profile: QoSProfile, keep_alive: float, ) -> Optional[str]: @@ -172,6 +177,14 @@ def publisher( pub = node.create_publisher(msg_module, topic_name, qos_profile) + times_since_last_log = 0 + while pub.get_subscription_count() < wait_matching_subscriptions: + # Print a message reporting we're waiting each 1s, check condition each 100ms. + if not times_since_last_log: + print(f'Waiting for {wait_matching_subscriptions} matching subscription(s)...') + times_since_last_log = (times_since_last_log + 1) % 10 + time.sleep(0.1) + msg = msg_module() try: set_message_fields(msg, values_dictionary) @@ -188,8 +201,6 @@ def timer_callback(): print('publishing #%d: %r\n' % (count, msg)) pub.publish(msg) - # give some time for discovery process - time.sleep(0.1) timer_callback() if times != 1: timer = node.create_timer(period, timer_callback) From 19ce63631a8c8c47cdd74175ccb05ff9ccab3211 Mon Sep 17 00:00:00 2001 From: Ivan Santiago Paunovic Date: Fri, 28 May 2021 16:36:50 -0300 Subject: [PATCH 2/6] Fix tests, add new test case Signed-off-by: Ivan Santiago Paunovic --- ros2topic/ros2topic/verb/pub.py | 6 ++-- ros2topic/test/test_cli.py | 51 +++++++++++++++++++++++++++++++-- ros2topic/test/test_echo_pub.py | 4 ++- 3 files changed, 54 insertions(+), 7 deletions(-) diff --git a/ros2topic/ros2topic/verb/pub.py b/ros2topic/ros2topic/verb/pub.py index ddbaab29a..f07037c04 100644 --- a/ros2topic/ros2topic/verb/pub.py +++ b/ros2topic/ros2topic/verb/pub.py @@ -80,12 +80,12 @@ def add_arguments(self, parser, cli_name): group.add_argument( '-1', '--once', action='store_true', help='Publish one message and exit') - group.add_argument( - '-m', '--wait-matching-subscriptions', type=nonnegative_int, default=1, - help='Wait until finding the specified number of matching subscriptions') group.add_argument( '-t', '--times', type=nonnegative_int, default=0, help='Publish this number of times and then exit') + parser.add_argument( + '-m', '--wait-matching-subscriptions', type=nonnegative_int, default=1, + help='Wait until finding the specified number of matching subscriptions') parser.add_argument( '--keep-alive', metavar='N', type=positive_float, default=0.1, help='Keep publishing node alive for N seconds after the last msg ' diff --git a/ros2topic/test/test_cli.py b/ros2topic/test/test_cli.py index 53b9fa35b..82da90664 100644 --- a/ros2topic/test/test_cli.py +++ b/ros2topic/test/test_cli.py @@ -660,7 +660,7 @@ def test_topic_pub(self): 'publisher: beginning loop', "publishing #1: std_msgs.msg.String(data='foo')", '' - ], strict=True + ] ), timeout=10) assert self.listener_node.wait_for_output(functools.partial( launch_testing.tools.expect_output, expected_lines=[ @@ -686,7 +686,7 @@ def test_topic_pub_once(self): 'publisher: beginning loop', "publishing #1: std_msgs.msg.String(data='bar')", '' - ], strict=True + ] ), timeout=10) assert topic_command.wait_for_shutdown(timeout=10) assert self.listener_node.wait_for_output(functools.partial( @@ -696,6 +696,51 @@ def test_topic_pub_once(self): ), timeout=10) assert topic_command.exit_code == launch_testing.asserts.EXIT_OK + def test_topic_pub_once_matching_two_listeners( + self, launch_service, proc_info, proc_output, path_to_listener_node_script, additional_env + ): + second_listener_node_action = Node( + executable=sys.executable, + arguments=[path_to_listener_node_script], + remappings=[('chatter', 'chit_chatter')], + additional_env=additional_env, + name='second_listener', + ) + with launch_testing.tools.launch_process( + launch_service, second_listener_node_action, proc_info, proc_output + ) as second_listener_node, \ + self.launch_topic_command( + arguments=[ + 'pub', '--once', + '--keep-alive', '3', # seconds + '-m', '2', + '--qos-durability', 'transient_local', + '--qos-reliability', 'reliable', + '/chit_chatter', + 'std_msgs/msg/String', + '{data: bar}' + ] + ) as topic_command: + assert topic_command.wait_for_output(functools.partial( + launch_testing.tools.expect_output, expected_lines=[ + 'publisher: beginning loop', + "publishing #1: std_msgs.msg.String(data='bar')", + '' + ] + ), timeout=10) + assert topic_command.wait_for_shutdown(timeout=10) + assert self.listener_node.wait_for_output(functools.partial( + launch_testing.tools.expect_output, expected_lines=[ + re.compile(r'\[INFO\] \[\d+\.\d*\] \[listener\]: I heard: \[bar\]') + ] + ), timeout=10) + assert second_listener_node.wait_for_output(functools.partial( + launch_testing.tools.expect_output, expected_lines=[ + re.compile(r'\[INFO\] \[\d+\.\d*\] \[second_listener\]: I heard: \[bar\]') + ] + ), timeout=10) + assert topic_command.exit_code == launch_testing.asserts.EXIT_OK + def test_topic_pub_print_every_two(self): with self.launch_topic_command( arguments=[ @@ -716,7 +761,7 @@ def test_topic_pub_print_every_two(self): '', "publishing #4: std_msgs.msg.String(data='fizz')", '' - ], strict=True + ] ), timeout=10), 'Output does not match: ' + topic_command.output assert self.listener_node.wait_for_output(functools.partial( launch_testing.tools.expect_output, expected_lines=[ diff --git a/ros2topic/test/test_echo_pub.py b/ros2topic/test/test_echo_pub.py index 7f41b6d87..e1f3f135c 100644 --- a/ros2topic/test/test_echo_pub.py +++ b/ros2topic/test/test_echo_pub.py @@ -157,6 +157,8 @@ def message_callback(msg): with launch_testing.tools.launch_process( launch_service, command_action, proc_info, proc_output, output_filter=launch_testing_ros.tools.basic_output_filter( + filtered_patterns=[ + r'Waiting for \{[0-7]+\} matching subscription\(s\)\.\.\.'], filtered_rmw_implementation=get_rmw_implementation_identifier() ) ) as command: @@ -180,7 +182,7 @@ def message_callback(msg): @launch_testing.markers.retry_on_failure(times=5) def test_pub_times(self, launch_service, proc_info, proc_output): command_action = ExecuteProcess( - cmd=(['ros2', 'topic', 'pub', '-t', '5', '/clitest/topic/pub_times', + cmd=(['ros2', 'topic', 'pub', '-t', '5', '-m', '0', '/clitest/topic/pub_times', 'std_msgs/String', 'data: hello']), additional_env={ 'PYTHONUNBUFFERED': '1' From 3d59b36971615b8cab6f733578023b182b833864 Mon Sep 17 00:00:00 2001 From: Ivan Santiago Paunovic Date: Fri, 28 May 2021 16:41:27 -0300 Subject: [PATCH 3/6] Rephrase log Signed-off-by: Ivan Santiago Paunovic --- ros2topic/ros2topic/verb/pub.py | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/ros2topic/ros2topic/verb/pub.py b/ros2topic/ros2topic/verb/pub.py index f07037c04..d0a81efea 100644 --- a/ros2topic/ros2topic/verb/pub.py +++ b/ros2topic/ros2topic/verb/pub.py @@ -181,7 +181,8 @@ def publisher( while pub.get_subscription_count() < wait_matching_subscriptions: # Print a message reporting we're waiting each 1s, check condition each 100ms. if not times_since_last_log: - print(f'Waiting for {wait_matching_subscriptions} matching subscription(s)...') + print( + f'Waiting for at least {wait_matching_subscriptions} matching subscription(s)...') times_since_last_log = (times_since_last_log + 1) % 10 time.sleep(0.1) From 006c4785a916358321c5602e3cbcd28da2db569e Mon Sep 17 00:00:00 2001 From: Ivan Santiago Paunovic Date: Mon, 31 May 2021 18:52:24 -0300 Subject: [PATCH 4/6] Delete unneeded filter Signed-off-by: Ivan Santiago Paunovic --- ros2topic/test/test_echo_pub.py | 2 -- 1 file changed, 2 deletions(-) diff --git a/ros2topic/test/test_echo_pub.py b/ros2topic/test/test_echo_pub.py index e1f3f135c..b7fd08686 100644 --- a/ros2topic/test/test_echo_pub.py +++ b/ros2topic/test/test_echo_pub.py @@ -157,8 +157,6 @@ def message_callback(msg): with launch_testing.tools.launch_process( launch_service, command_action, proc_info, proc_output, output_filter=launch_testing_ros.tools.basic_output_filter( - filtered_patterns=[ - r'Waiting for \{[0-7]+\} matching subscription\(s\)\.\.\.'], filtered_rmw_implementation=get_rmw_implementation_identifier() ) ) as command: From bf92afecf75376ff2d08c1b39e802d875dafb93c Mon Sep 17 00:00:00 2001 From: Ivan Santiago Paunovic Date: Wed, 2 Jun 2021 14:29:14 -0300 Subject: [PATCH 5/6] Replace -m with -w Signed-off-by: Ivan Santiago Paunovic --- ros2topic/ros2topic/verb/pub.py | 2 +- ros2topic/test/test_cli.py | 2 +- ros2topic/test/test_echo_pub.py | 2 +- 3 files changed, 3 insertions(+), 3 deletions(-) diff --git a/ros2topic/ros2topic/verb/pub.py b/ros2topic/ros2topic/verb/pub.py index d0a81efea..aeb81efde 100644 --- a/ros2topic/ros2topic/verb/pub.py +++ b/ros2topic/ros2topic/verb/pub.py @@ -84,7 +84,7 @@ def add_arguments(self, parser, cli_name): '-t', '--times', type=nonnegative_int, default=0, help='Publish this number of times and then exit') parser.add_argument( - '-m', '--wait-matching-subscriptions', type=nonnegative_int, default=1, + '-w', '--wait-matching-subscriptions', type=nonnegative_int, default=1, help='Wait until finding the specified number of matching subscriptions') parser.add_argument( '--keep-alive', metavar='N', type=positive_float, default=0.1, diff --git a/ros2topic/test/test_cli.py b/ros2topic/test/test_cli.py index 82da90664..8b3b5874d 100644 --- a/ros2topic/test/test_cli.py +++ b/ros2topic/test/test_cli.py @@ -713,7 +713,7 @@ def test_topic_pub_once_matching_two_listeners( arguments=[ 'pub', '--once', '--keep-alive', '3', # seconds - '-m', '2', + '-w', '2', '--qos-durability', 'transient_local', '--qos-reliability', 'reliable', '/chit_chatter', diff --git a/ros2topic/test/test_echo_pub.py b/ros2topic/test/test_echo_pub.py index b7fd08686..6f5a95160 100644 --- a/ros2topic/test/test_echo_pub.py +++ b/ros2topic/test/test_echo_pub.py @@ -180,7 +180,7 @@ def message_callback(msg): @launch_testing.markers.retry_on_failure(times=5) def test_pub_times(self, launch_service, proc_info, proc_output): command_action = ExecuteProcess( - cmd=(['ros2', 'topic', 'pub', '-t', '5', '-m', '0', '/clitest/topic/pub_times', + cmd=(['ros2', 'topic', 'pub', '-t', '5', '-w', '0', '/clitest/topic/pub_times', 'std_msgs/String', 'data: hello']), additional_env={ 'PYTHONUNBUFFERED': '1' From 93cf8474fb4a4334a8438530adf03897cb803cb1 Mon Sep 17 00:00:00 2001 From: Ivan Santiago Paunovic Date: Wed, 2 Jun 2021 14:34:09 -0300 Subject: [PATCH 6/6] Default to 0, only default to 1 when using --times Signed-off-by: Ivan Santiago Paunovic --- ros2topic/ros2topic/verb/pub.py | 9 ++++++--- 1 file changed, 6 insertions(+), 3 deletions(-) diff --git a/ros2topic/ros2topic/verb/pub.py b/ros2topic/ros2topic/verb/pub.py index aeb81efde..7a1bc680f 100644 --- a/ros2topic/ros2topic/verb/pub.py +++ b/ros2topic/ros2topic/verb/pub.py @@ -84,8 +84,10 @@ def add_arguments(self, parser, cli_name): '-t', '--times', type=nonnegative_int, default=0, help='Publish this number of times and then exit') parser.add_argument( - '-w', '--wait-matching-subscriptions', type=nonnegative_int, default=1, - help='Wait until finding the specified number of matching subscriptions') + '-w', '--wait-matching-subscriptions', type=nonnegative_int, default=None, + help=( + 'Wait until finding the specified number of matching subscriptions. ' + 'Defaults to 1 when using "-1"/"--once"/"--times", otherwise defaults to 0.')) parser.add_argument( '--keep-alive', metavar='N', type=positive_float, default=0.1, help='Keep publishing node alive for N seconds after the last msg ' @@ -149,7 +151,8 @@ def main(args): 1. / args.rate, args.print, times, - args.wait_matching_subscriptions, + args.wait_matching_subscriptions + if args.wait_matching_subscriptions is not None else int(times != 0), qos_profile, args.keep_alive)