Skip to content

Commit

Permalink
make use of TopicEndpointInfo objects
Browse files Browse the repository at this point in the history
Signed-off-by: Miaofei <miaofei@amazon.com>
  • Loading branch information
mm318 committed Jan 24, 2020
1 parent 7972ba2 commit baed2dc
Show file tree
Hide file tree
Showing 2 changed files with 9 additions and 24 deletions.
30 changes: 7 additions & 23 deletions ros2topic/ros2topic/verb/info.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,32 +12,12 @@
# See the License for the specific language governing permissions and
# limitations under the License.

from rclpy.qos import QoSDurabilityPolicy
from rclpy.qos import QoSLivelinessPolicy
from rclpy.qos import QoSReliabilityPolicy

from ros2cli.node.direct import DirectNode
from ros2topic.api import get_topic_names_and_types
from ros2topic.api import TopicNameCompleter
from ros2topic.verb import VerbExtension


def print_topic_endpoint_info(topic_name, get_topic_endpoint_info_func):
for info in get_topic_endpoint_info_func(topic_name):
print('Node name: %s' % info['node_name'])
print('Node namespace: %s' % info['node_namespace'])
print('Topic type: %s' % info['topic_type'])
print('GID: %s' % '.'.join(format(x, '02x') for x in info['gid']))
print('QoS profile:')
qos_profile = info['qos_profile']
print(' Reliability: %s' % QoSReliabilityPolicy(qos_profile['reliability']).name)
print(' Durability: %s' % QoSDurabilityPolicy(qos_profile['durability']).name)
print(' Lifespan: %d nanoseconds' % qos_profile['lifespan'].nanoseconds)
print(' Deadline: %d nanoseconds' % qos_profile['deadline'].nanoseconds)
print(' Liveliness: %s' % QoSLivelinessPolicy(qos_profile['liveliness']).name)
print(' Liveliness lease duration: %d nanoseconds\n' %
qos_profile['liveliness_lease_duration'].nanoseconds)

class InfoVerb(VerbExtension):
"""Print information about a topic."""

Expand All @@ -46,7 +26,9 @@ def add_arguments(self, parser, cli_name):
'topic_name',
help="Name of the ROS topic to get info (e.g. '/chatter')")
parser.add_argument(
'--verbose', '-v', action='store_true',
'--verbose',
'-v',
action='store_true',
help='Prints detailed information like the node name, node namespace, topic type, '
'GUID and QoS Profile of the publishers and subscribers to this topic')
arg.completer = TopicNameCompleter(
Expand Down Expand Up @@ -74,13 +56,15 @@ def main(self, *, args):
print('Publisher count: %d' % node.count_publishers(topic_name), end=lineend)
if args.verbose:
try:
print_topic_endpoint_info(topic_name, node.get_publishers_info_by_topic)
for info in node.get_publishers_info_by_topic(topic_name):
print(info, end=lineend)
except NotImplementedError as e:
return str(e)

print('Subscription count: %d' % node.count_subscribers(topic_name))
if args.verbose:
try:
print_topic_endpoint_info(topic_name, node.get_subscriptions_info_by_topic)
for info in node.get_subscriptions_info_by_topic(topic_name):
print(info, end=lineend)
except NotImplementedError as e:
return str(e)
3 changes: 2 additions & 1 deletion ros2topic/test/test_cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -272,7 +272,7 @@ def test_topic_endpoint_info(self):

@launch_testing.markers.retry_on_failure(times=5)
def test_topic_endpoint_info_verbose(self):
with self.launch_topic_command(arguments=['info', '--verbose', '/chatter']) as topic_command:
with self.launch_topic_command(arguments=['info', '-v', '/chatter']) as topic_command:
assert topic_command.wait_for_shutdown(timeout=10)
assert topic_command.exit_code == launch_testing.asserts.EXIT_OK
assert launch_testing.tools.expect_output(
Expand All @@ -284,6 +284,7 @@ def test_topic_endpoint_info_verbose(self):
re.compile(r'Node name: \w+'),
'Node namespace: /',
'Topic type: std_msgs/msg/String',
re.compile(r'Endpoint type: (INVALID|PUBLISHER|SUBSCRIPTION)'),
re.compile(r'GID: [\w\.]+'),
'QoS profile:',
re.compile(r' Reliability: RMW_QOS_POLICY_RELIABILITY_\w+'),
Expand Down

0 comments on commit baed2dc

Please sign in to comment.